[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-09-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/20414
  
Hi, @jiangxb1987 . Could you close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-07-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-07-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93558/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-07-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20414
  
**[Test build #93558 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93558/testReport)**
 for PR 20414 at commit 
[`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-07-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20414
  
**[Test build #93558 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93558/testReport)**
 for PR 20414 at commit 
[`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread sameeragarwal
Github user sameeragarwal commented on the issue:

https://github.com/apache/spark/pull/20414
  
Thanks @mridulm, all great points!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
Ouch... Yea, we have to think out a way to make it deterministic under hash 
collisions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/20414
  
@jiangxb1987 You are correct when the sizes of the map's are same.
But if the map sizes are different, the resulting order can be different - 
which can happen when requests for additional memory follows different patterns 
on re-execution (trigger'ing spill).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
Hey I searched the `ExternalAppendOnlyMap` and here are the findings:
The `ExternalAppendOnlyMap` claims it keeps the sorted content, but it 
actually uses a `HashComparator` that compare the elements by their hashes. 
Luckily, it sort the elements using TimSort which is stable, that means, even 
if there exists hash collisions, the output sequence should still be 
deterministic, as long as the inputs are (which we can achieve by modifying 
`ShuffleBlockFetcherIterator` per previous discussion).

We may need to check for all the other places we may spill/compare objects 
to ensure we generate deterministic output sequence everywhere, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/20414
  
@jiangxb1987 Unfortunately I am unable to analyze this in detail; but 
hopefully can give some pointers, which I hope, helps !

One example I can think of is, for shuffle which uses Aggregator (like 
combineByKey), via ExternalAppendOnlyMap.
The order in which we replay the keys with the same hash is non 
deterministic from what I remember - for example if first run did not result in 
any spills, second run had 3 spills and third run had 7, the order of keys 
(with same hash) could be different in each.

Similarly, with sort based shuffle, depending on the length of the data 
array in AppendOnlyMap (which is determined by whether we spilt or not) we can 
get different sort order's ?
Similarly for the actual sort itself, the `merge` quite clearly is 
sensitive to number of spills (for example when no aggregator or ordering, it 
is simply `iterators.iterator.flatten`).

There might be other cases where this is happening - I have not regularly 
looked at this part of the codebase in a while now unfortunately.

Please note that all the cases above, there is no ordering defined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
@mridulm I also agree we should follow @sameeragarwal 's suggestion to let 
shuffle fetch produce deterministic output, and only do this for a few 
operations (e.g. repartition/zipWithIndex, do we have more?) IIUC spill should 
NOT affect the result, but if you find any suspects, please kindly share them 
with us. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-29 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/20414
  
@shivaram Thinking more, this might affect everything which does a zip (or 
variants/similar idioms like limit K, etc) on partition should be affected - 
with random + index in coalesce + shuffle=true being one special case.

Essentially anything which assumes that order of records in a partition 
will always be the same - currently,
* Reading from an external immutable source like hdfs, etc (including 
checkpoint)
* Reading from block store
* Sorted partitions 
should guarantee this - others need not.

The more I think about it, I like @sameeragarwal's suggestion in #20393, a 
general solution for this could be introduce deterministic output for shuffle 
fetch - when enabled takes a more expensive but repeatable iteration of shuffle 
fetch.

This assumes that spark shuffle is always repeatable given same input (I am 
yet to look into this in detail when spills are involved - any thoughts 
@sameeragarwal ?), which could be an implementation detail; but we could make 
it a requirement for shuffle.

Note that we might be able to avoid this additional cost for most of the 
current usecases (otherwise we would have faced this problem 2 major releases 
ago !); so actual user impact, hopefully, might not be as high.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
@cloud-fan Yea you provide a more clear statement here, and I totally agree!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20414
  
> Not quite - coalesce will not combine partitions across executors (aka 
shuffle) so you could still end up having many many files.

I'm not sure if I follow here. For `coalesce(1)` Spark just launches a 
single task to handle all the data partitions. If the final action is saving 
file, we still have only one file at the end. Compared to `repartition(1)`, I 
think the only difference is the cost of task retry.

I think we can special case `repartition(1)`, if there is only one reducer, 
we don't need to add the local sort.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
@felixcheung You are right that I didn't make it clear there should be 
still many shuffle blocks, and if you have the read task retried it should be 
slower than using `repartition(1)` directly.

Now I tend to fix the issue following the latter fix-shuffle-fetch-order 
way, since it may resolve for general cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20414
  
> Actually for the first case, you shall use coalesce() instead of 
repartition() to get a similar effect, without need of another shuffle! 
Not quite - coalesce will not combine partitions across executor (aka 
shuffle) so you could still end up having many many files.

I have seen that quite a bit with large scale ML. But FWIW, my comment 
earlier was for both "regular" use cases and ML use cases.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20414
  
Talked to @yanboliang offline, he claimed that the major use cases of 
RDD/DataFrame.repartition() in ml workloads he has observed are:
1. During save models, you may need `repartition()` to reduce the number of 
output files, a typical special case is `xxx.repartition(1)`;
2. You may use `repartition()` to let the original data set to have more 
partitions, to gain a higher parallelism for following operations.

Actually for the first case, you shall use `coalesce()` instead of 
`repartition()` to get a similar effect, without need of another shuffle! Also, 
the scene don't strictly require the data set to distribute evenly, so the 
change from round-robin partitioning to hash partitioning should be fine.
For the latter case, if you have a bunch of data with the same values, the 
change may lead to high data skew and brings performance regression, currently 
the best-effort-approach we can choose is to perform a local sort if the data 
type is comparable (and that also requires a lot of work refactoring the 
`ExternalSorter`).

Another approach is that we may let the `ShuffleBlockFetcherIterator` to 
remember the sequence of block fetches, and force the blocks to be fetched 
one-by-one. This actually involves more issues because we may face memory limit 
and therefore have to spill the fetched blocks. IIUC this should resolve the 
issue for general cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-28 Thread shivaram
Github user shivaram commented on the issue:

https://github.com/apache/spark/pull/20414
  
@jiangxb1987 @mridulm Could we have a special case of using the sort-based 
approach when the RDD type is comparable ? I think that should cover a bunch of 
the common cases and the hash version will only be used when keys are not 
comparable.

Also @mridulm your point about more things other than repartition being 
affected is definitely true (just in this file `randomSampleWithRange` I think 
is affected). I think the only way to solve this in general is to enforce 
deterministic ordering when constructing ShuffleRDDs ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20414
  
Just for context, I'm seeing RDD.repartition being used *a lot*.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86728/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20414
  
**[Test build #86728 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)**
 for PR 20414 at commit 
[`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/20414
  
In addition, any use of random in spark code will get affected by this - 
unless input is an idempotent source; even if random initialization is done 
predictably with the partition index (which we were doing here anyway).
We might want to look at mllib and other places as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20414
  
**[Test build #86728 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)**
 for PR 20414 at commit 
[`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/304/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org