[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-11-22 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689206#comment-15689206
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~kexianda]:  

{quote}
  I hava a question: if user did not set the RequestedParallelism in the 
script. what's default setRequestedParallelism? Do we have to handle the 
default parallelism at runtime?

{quote}
{code}
  public static int getParallelism(List> predecessors,
PhysicalOperator physicalOperator) {

int numReducers = SparkEnv.SPARK_REDUCERS;
if (numReducers != 0) {
return numReducers;
}

int parallelism = physicalOperator.getRequestedParallelism();
if (parallelism <= 0) {
// Parallelism wasn't set in Pig, so set it to whatever Spark thinks
// is reasonable.
parallelism = predecessors.get(0).context().defaultParallelism();
}

return parallelism;
}
{code}

from above code ,you can see that the requestedParallelism is first decided by 
SparkEnv.SPARK_REDUCERS(see PIG-5068), secondly decided by the 
physicalOperator's defaultParallelism and thirdly decided by the parallelism 
value of parent rdd.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, 
> PIG-5051_5029_5.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-11-22 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688696#comment-15688696
 ] 

Xianda Ke commented on PIG-5029:


Hi [~kellyzly],

Salted key solution seem OK.  JDK's Random is a pseudorandom number generator 
(PRNG), also known as a deterministic random bit generator DRBG. If the seed 
task-id is same, random numbers sequence will be the same.

I hava a question: if user did not set the RequestedParallelism in the script. 
what's default setRequestedParallelism?  Do we have to handle the default 
parallelism at runtime?

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, 
> PIG-5051_5029_5.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-10-31 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624556#comment-15624556
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~knoguchi]:  

Before we discussed a lot about the skewed key sort issue in spark mode.  
Before i proposed the PIG-5029_1.patch and append a random integer to the 
original key thus to make the skewed key distributed evenly([salted key 
solution 
|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska].
 You mentioned some issues this patch brings such as duplicated and missing 
results. Later [~tgraves] commented that
{quote}
I'm assuming in the presentation they are using a salt key such that it 
generates the same random if the map task is reran like Koji mentioned. For 
instance can you use the task id (not attempt) such that the key is always the 
same if it re-runs.
{quote}
in PIG-5051_5029_5.patch
So now i generate the random by {noformat} new Random(seed) {noformat}. Here 
the value of {noformat}seed{noformat} is 
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);  thus the 
generated random integers in different task attempts of a spark task are same 
if retries happens because the value of  
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) are same.

Do you think this solution is ok? If you think it is ok.  [~kexianda] will 
start to review this patch.

Thanks [~knoguchi]'s patience on this jira again and this is a really 
interesting issue.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, 
> SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-28 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530152#comment-15530152
 ] 

Koji Noguchi commented on PIG-5029:
---

{quote}
bq. how is pig handling skew for MR/TEZ?
Sampling is done and partitioning is done based on the weights assigned to the 
repeating keys in the sample.
{quote}
When we really need Random(), then we seed based on taskid with some shifting 
so that subsequent task won't get similar seeds.
Example https://issues.apache.org/jira/browse/PIG-4819

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-28 Thread Rohini Palaniswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530114#comment-15530114
 ] 

Rohini Palaniswamy commented on PIG-5029:
-

bq. how is pig handling skew for MR/TEZ?
   Sampling is done and partitioning is done based on the weights assigned to 
the repeating keys in the sample.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-28 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530104#comment-15530104
 ] 

Koji Noguchi commented on PIG-5029:
---

Thanks a million [~tgraves].

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-28 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530065#comment-15530065
 ] 

Thomas Graves commented on PIG-5029:


Is the question whether spark supports maps that aren't idempotent?   ie if it 
runs again it could generate different output?  If so I think that is a bad 
assumption even if it did.  maps should be idempotent because you have things 
like speculative execution and failure cases as you have talked about that rely 
or could rely on this.

Spark does try to be intelligent about rerunning stages, so if you get a 
fetchFailure of a map after one of the reducers has already finished, it will 
go rerun that map and then only rerun the reducers that haven't finished.  So 
if the map output is different you could end up with different results across 
those reducers.

The external shuffle doesn't necessarily solve this problem. It helps but if 
the node that has the map output goes away, it would have to be recomputed. 

I'm assuming in the presentation they are using a salt key such that it 
generates the same random if the map task is reran like Koji mentioned. For 
instance can you use the task id (not attempt) such that the key is always the 
same if it re-runs. [~rohini] how is pig handling skew for MR/TEZ?


> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-27 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528532#comment-15528532
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~knoguchi]: thanks for patience on this jira. I will enable spark shuffle 
service to make the reduce task(in spark) to read the persistent shuffle temp 
output to avoid recompute the map task which causing the redundant data).

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-27 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15526775#comment-15526775
 ] 

Koji Noguchi commented on PIG-5029:
---

[~kellyzly], I believe I explained how pure RANDOM key would break mapreduce 
(and Tez).  Note that map task is not flagged fail or kill when the first 
reducer pull the map output successfully.  It's only failed/killed and retried 
when second reducer tries to pull the output and fail.  

[~tgraves], maybe you can close the gap for us?   I don't know Spark to say if 
the same issue applies to Spark or not.

If you can persist the temp output (as suggested by [~vanzin]), that would work 
too.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-26 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524845#comment-15524845
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~vanzin]:  I guess what [~rohini] and [~knoguchi] worries is about the same 
sequence will be generated after retrying the given task and spark will *not* 
remove the temporary output of the previous failed task and will aggregate the 
temporary result to the final result so causes redundant result.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-26 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524832#comment-15524832
 ] 

Marcelo Vanzin commented on PIG-5029:
-

I'm not sure I understand the question. If you're worried about executors 
dying, use an external shuffle service. Then shuffle files will be available 
regardless of executor state.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-26 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524780#comment-15524780
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~rohini] and [~knoguchi]: thanks for your patience on this jira.
bq. Key is making Random less random so that it'll produce same sequence of 
values when retried for the given task.

You mean that different key which does not belongs to the same partition will 
be distributed to the same partition after we use (key+random integer) after 
retried the given task, is my understanding right?

If my understanding is right, i wonder why salted key(appending random to the 
key) is  widely used in 
[HBase|https://sematext.com/blog/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/]
 and even proposed in the [Spark Submit 
2016|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska]
 to make the key distributed evenly if salted key will generate problems in the 
retried task case.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-26 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524159#comment-15524159
 ] 

Koji Noguchi commented on PIG-5029:
---

[~rohini], can you take this ?  Obviously I'm not explaining it well. 
Key is making Random less random so that it'll produce same sequence of values 
when retried for the given task.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-25 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15521883#comment-15521883
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~vanzin]:  Thanks for your comment, here i have a question about using salted 
key to solve the skewed data problem in the above link: Will *redundant* data 
be generated? for example, salt the key(append a random integer to make a new 
key), and transform the key after several rdd transformations, the spark 
job/stage/task fails because of fetch failure or node failure and the temporary 
output is still saved on the disk and spark retries task. Will spark aggregate 
the *temporary output* which is generated by the last failed task to the final 
result? I think this will *not* happen because spark will remove temporary 
output if [fail to fetch map 
outputs|https://github.com/apache/spark/blob/649fa4bf1d6fc9271ae56b6891bc93ebf57858d1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1278].
  Can you double confirm this because [~knoguchi] proposed that it will 
generate redundant data after fetch failure if we use salt key solution(append 
a random integer to make a new key to distribute keys more evenly)? 

[~knoguchi]:
And about node failure to cause the redundant key, i think spark will rerun the 
task on other nodes and this will not aggregate temporary output on the failed 
node to the final result.
About data missing, i think this will *only* happen when we use the random 
integer as the *only* key of the tuple(from the case you provided in PIG-3257)



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-23 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516915#comment-15516915
 ] 

Marcelo Vanzin commented on PIG-5029:
-

Not really my area of expertise; but this reminds me of Ted Malaska's talk 
(http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska,
 starts at slide 40), maybe that can be used (sort of sounds similar to the 
weighted partitioner).

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-22 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515605#comment-15515605
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~kexianda], [~mohitsabharwal] , [~pallavi.rao] and [~xuefuz]:

Current I encounter the problem when I use RDD.sortByKey() when the data is 
skewed.  Although RangePartitioner is used in RDD.sortByKey.
Give an example to explain the problem.
{code}
  val rdd = sc.parallelize(1.to(2000)).map(x=>(x/2000,x))
val numPartitions = 2
val partitioner = new RangePartition(numPartition,rdd)
val numsOfPartitions = rdd.keys.map(k=>partitioner.getPartition(
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))

{code}

{code}
The Result:
Key:0   val:1999
Key:1   val:1
{code}

As the code will generate data like [(0,1),(0,2),……(0,1999),(1,2000)],  we use 
RangePartitioner to partition the data.  Although it is said that 
It will partitions sortable records by range into roughly. But it seems that it 
will not distribute skewed keys into different partitions evenly.


After reviewing the code of RangePartitioner, I found that although 
RangePartitioner first sample, then order the sample data, then define the 
range for each partition to guarantee that keys to the correct partition(here 
correct means that elements in (partition-1) partition is all smaller than 
specified key and elements in (partition+1) partitions are bigger than 
specified key). But RangePartitioner cannot guarantee that
Keys are distributed evenly.

After reviewing the pig code WeightedRangePartitioner, I found that this 
WeightedRangePartitioner could handle this kind of case. The difference between 
WeightedRangePartitioner and RangePartitioner is that WeightedRangePartitioner 
also consider the weight of each element, if an element occupies a lot in the 
whole data.  It will occupies some partitions( for example 7 of total 40 
partition) not only 1 partition.

In order to solve this case, I have tried following solution( append a random 
to the key to make the key distributed evenly) and remove the random in the 
result like:
{code}
val random = new java.util.Random()
val rdd = sc.parallelize(1.to(2000)).map(x=>((x,random.nextInt()),x))
val numPartitions = 2
val partitioner = new org.apache.spark.RangePartitioner(numPartitions,rdd)
val numsOfPartitions = 
rdd.keys.map(k=>partitioner.getPartition(k)).countByValue()
numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2))
val newRdd = rdd.map( p => (p._1._1, p._2))
{code}
{code}
The Result:
Key:0   val:999
Key:1   val:1001
{code}
It shows that 2000 elements are distributed evenly in 2 partition after this 
solution. But we *cannot* use this solution as some guy from 
Community suggest not because this kind of solution will cause data *redundant* 
and *missing* if spark job retires after failure.

My question is:
Is there any better way to solve skewed key sort case in spark?



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the te

[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505624#comment-15505624
 ] 

Xianda Ke commented on PIG-5029:


[~kellyzly],

when task re-run, the partitioning is not the same the first run, because of  
the random int.
some records may be duplicated, some may be missed.

for your information:
in SkewedJoin(PIG-4858), records is sent to the reducers in a round robin 
fashion. This partitioning is not random, but even.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505394#comment-15505394
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~knoguchi]:
{quote}
If node goes down after reducer0_attempt0 pulled map output but before 
reducer1_attempt0 started pulling, then map output needs to be re-computed.
{quote}
 Hadoop will not delete the outputs before recovery( create a new task attempt 
to recompute)?
 



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505381#comment-15505381
 ] 

Koji Noguchi commented on PIG-5029:
---

bq. Hadoop will try new task attempt only when last task attempt fail

As I wrote in the original comment, " (could be fetch failure or node 
failure)." 
If node goes down after reducer0_attempt0 pulled map output but before 
reducer1_attempt0 started pulling, then map output needs to be _re-computed_.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505358#comment-15505358
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~knoguchi]: if this has nothing to do with speculative execution, why after 
reducer0_attempt0 successfully finshes pulls mapper0_attemp0 outputs and 
produces all n records, reduce1_attempt0 starts to pull repetitive n records 
from mapper0_attempt0.  Hadoop will try [new task 
attempt|https://github.com/apache/hadoop/blob/819224dcf9c683aa52f58633ac8e13663f1916d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java#L647]
 only when last task attempt fails. Is there anything i misunderstand?

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503610#comment-15503610
 ] 

Koji Noguchi commented on PIG-5029:
---

[~kellyzly], this has nothing to do with speculative execution.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-17 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15500402#comment-15500402
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~knoguchi]:  Thanks for your reply.
Here is a question about the example you provided in PIG-3257.
{code}
A = load ...
B = group A by UUID();
C = foreach B...
{code}
{quote}This job could successfully finish with output ranging from 0 to 2n 
records.
For example, sequence of events can be,
mapper0_attempt0 finish with n outputs and say all n uuid keys were assigned to 
reducer0.
reducer0_attempt0 pulls map outputs and produces n outputs.
reducer1_attempt0 tries to pull mapper0_attempt0 output and fail. (could be 
fetch failure or node failure).
mapper0_attempt1 rerun. And this time, all n uuid keys were assigned to 
reducer1.
reducer1_attempt0 pulls mapper0_attempt1 output and produces n outputs.
job finish successfully with 2n outputs.
This is certainly unexpected to users.
{quote}
My question is:
1. reducer0_attempt0 and reduce1_attempt0  both pull map outpout and ready to 
produce n outputs because of speculative mechanism?
2. If because of speculative mechanism, after reduce0_attempt0 finishes to 
produce n output, hadoop will cancel reduce1_attempt0 because reduce0_attempt0 
success) so there is no possibility to generate 2n outputs.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-14 Thread Rohini Palaniswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15490809#comment-15490809
 ] 

Rohini Palaniswamy commented on PIG-5029:
-

bq. Although spark will sample the data automatically before sort but in the 
benchmark test, the effect is bad
  You should file a spark jira as well so that they can fix their sampling 
logic.

bq. Now i'm investigating to use SkewedPartitioner
  That is for skewed join. For order by it is WeightedRangePartitioner.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-14 Thread Koji Noguchi (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15490595#comment-15490595
 ] 

Koji Noguchi commented on PIG-5029:
---

bq. Can you explain more about why this cause data loss and duplication? 

I mentioned about pure random hurting results once in 
https://issues.apache.org/jira/browse/PIG-3257?focusedCommentId=13669195l#comment-13669195

Also, I had to rewrite RANDOM udf later at PIG-4819.
And now, we're finding similar issue with RANDOM() call inside 
DiscreteProbabilitySampleGenerator which is used by pig's order-by call.  (Jira 
and patch to come soon.)

If spark has any kind of retries on failures, then you probably want to avoid 
pure randomness.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-13 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489116#comment-15489116
 ] 

liyunzhang_intel commented on PIG-5029:
---

[~rohini] and [~knoguchi]:  Thanks for interest in this skewed key sort problem.
[~rohini]:
bq. Pig has always handled skew automatically during order by.
Yes, in mr mode, pig will sample, partition and sort. In spark mode, just use 
RDD.sortByKey to implement sort feature. Although spark will sample the data 
automatically before sort but in the benchmark test, the effect is bad( a 
skewed key will be in 1 partition thus cause one part- is large while 
others are small)
bq.Using Random can cause data loss and duplication during reruns and should be 
avoided at all costs. We have been bitten by this a lot of times.
This is the only solution now i can get and shows a good performance 
improvement. Can you explain more about why this cause data loss and 
duplication?  
Now i'm investigating to use 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner
 which make the skewed key distributed evenly in spark mode.


> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-13 Thread Rohini Palaniswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488515#comment-15488515
 ] 

Rohini Palaniswamy commented on PIG-5029:
-

[~knoguchi] just pointed out this jira to me as there is plan to use random 
numbers.

bq. set an option "pig.spark.skewed.sort" in pig.properties and can be enabled 
if user consider the sorted data is distributed skewed.
  This should be the default behavior and not a configuration for user. Pig has 
always handled skew automatically during order by.

bq.  append a random integer to the sorted tuple like (1,(xxx)) to (1,(xxx), 
random integer)(SortConverter.ToKeyValueFunction).
  Using Random can cause data loss and duplication during reruns and should be 
avoided at all costs. We have been bitten by this a lot of times.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-13 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486676#comment-15486676
 ] 

liyunzhang_intel commented on PIG-5029:
---

The solution to solve the skewed data sort in PIG-5029.patch is:
0. set an option "pig.spark.skewed.sort" in pig.properties and can be enabled 
if user consider the sorted data is distributed skewed.
1. append a random integer to the sorted tuple like (1,(xxx)) to (1,(xxx), 
random integer)(SortConverter.ToKeyValueFunction).
2. add a new class SkewedKeyComparator in SortConverter which will influence 
the partition in the shuffle process of RDD.sortByKey. Before the partition 
will not be distributed evenly because of the skewed key. But now combined key 
(key,random integer) is used in the partition so the partition will not be 
skewed anymore.
3.The last random integer will be removed from the tuple in 
SortConverter.ToValueFunction.

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)