Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Thanks everyone! I closed this in favor of #22112
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@jiangxb1987 I am guessing we should close this PR ?
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
I tried a prototype to fix the handling of fetch failure, seems not that
hard: https://github.com/apache/spark/pull/22112
---
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
so I think the assumption is that task results are idempotent but not
ordered. Sorry if that contradictory. The data itself has to be the same on
rerun but the order of things in there
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
> ... assume that computation is idempotent - we do not support non
determinism in computation
Ah this is a reasonable restriction, we should document it in the RDD
classdoc. How about
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
> I guess on the RDD side its not called RoundRobinPartitioner
Thanks for clarifying @tgravescs ! I was looking at `RangePartitioner` and
variants and was wondering what I was missing - did
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@cloud-fan I think we have to be clear on the boundaries of the solution we
can provide in spark.
> RDD#mapPartitions and its friends can take arbitrary user functions,
which may produce
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
> @squito @tgravescs I am probably missing something about why hash
partitioner helps, can you please clarify ?
> IIRC the partitioner for CoalescedRDD when shuffle is enabled is
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Thanks @cloud-fan your summary above is super useful, and I think it's
clear enough.
> So when we see fetch failure and rerun map tasks, we should track which
reducers have its shuffle
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
I took a quick look at the shuffle writer and feel it will be hard to
insert a sort there.
I have a simpler proposal for the fix. To trigger this bug, there must be a
shuffle before the
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@squito @tgravescs I am probably missing something about why hash
partitioner helps, can you please clarify ?
IIRC the partitioner for CoalescedRDD when shuffle is enabled is
HashPartitioner
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@tgravescs I vaguely remember someone at y! labs telling me (more than a
decade back) about MR always doing a sort as part of its shuffle to avoid a
variant of this problem by design.
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
I also think @tgravescs solution of using the HashPartitioner is an
acceptable one, though as you've noted it doesn't deal w/ skew (which may be a
lot of the existing use of `repartition()`). I
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
@tgravescs I'm still working on this but I would be glad if you can also
work on the "sort the serialized bytes of T" approach, actually the
retry-all-tasks approach seems more complex than I
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/21698
> "you can at least sort the serialized bytes of T"
I think this should work.
---
-
To unsubscribe, e-mail:
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
@jiangxb1987 can you clarify if you are working on this still or if you
won't have time for a bit?
@mridulm @zsxwing @cloud-fan thoughts on @squito approach to "you can at
least
Github user markhamstra commented on the issue:
https://github.com/apache/spark/pull/21698
> I really disagree with this.
I really agree with Tom. At this point, I think the working assumption
should be that any 2.4.0 release candidate that doesn't deliver some fix for
this
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
if you are looking at recomputing how are you going to handle if some tasks
have already written output? This was brought up by @cloud-fan above and I
didn't see a response. Some output formats
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
We fixed the DataFrame repartition correctness issue by inserting a local
sort before repartition, and feedback for this approach is generally negative
because the performance of repartition()
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
yeah, you'd have to sort the entire record. I think originally it didn't
seem like that would work, because you don't know that `T` is sortable for
`RDD[T]`. But after a sort, you've got bytes, so
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
thinking about the sorting thing again but I don't think it works unless
you sort both the keys and the values themselves.
for instance lets say we have a groupby key which generates:
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/21698
> IIUC streaming query always need to specify a checkpoint location?
You can use a batch query to read and write Kafka :) My point is if the
input and output data sources are not
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
Ok, it seems like the proposal @squito had to sort on the
binary/serialized data seems like at least a good short term solution. any
sorting is going to definitely add overhead but at least its
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
> What happened to the other pr proposal of just using the hashPartitioner?
It's not able to handle the use case when we use `repartition()` to shuffle
skewed data, so we give up on
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
Still catching up on this and trying to understand all the cases.
What happened to the other pr proposal of just using the hashPartitioner?
Did we give up on that because of the skewed
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
> What if the user does't provide a distributed file system path? E.g., you
can read from Kafka and write them back to Kafka and such workloads don't need
a distributed file system in standalone
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
IIUC streaming query always need to specify a checkpoint location?
---
-
To unsubscribe, e-mail:
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/21698
> I also like ideas based on checkpointing
What if the user does't provide a distributed file system path? E.g., you
can read from Kafka and write them back to Kafka and such workloads
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
> The above fix proposal requires more code refactoring of DAGScheduler,
and it shall consume some memories to store additional informations (assume you
have M active/finished stages, and N stages
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
More thoughts: what if the last step of the job is writing data out? Do we
need to improve the `OutputCoordinator` to support canceling all the writing
tasks? Shall we simplify the logic and just
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
> statistically fine considering most Spark jobs are short-running and
don't hit FetchFailure quite often (The major advantage of this approach is
that you don't pay for any penalty if you don't hit
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Actually I have been thinking about the issue and feel we can still go
further with the current patch to fix the issue with child stages @mridulm
mentioned above, that requires track all the
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
I'm still working on this, hopefully I can update this PR by the end of
this week. Sorry I didn't get back to this earlier because I was working on the
barrier scheduling issue.
---
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
@tgravescs its not guaranteed to reproduce with that. IIUC, you need to do
a repartition in the same stage that also does a shuffle-read, then have a
fetch failure, and on recompute that stage
Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
Sorry for coming in late on this, first I saw this was the other day.
Could someone perhaps summarize the discussions here and exactly when this
happens and why? Checkpointing was
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
sorry I got bogged down in some other things, thanks for the responses:
>> on a fetch-failure in repartition, fail the entire job
> Currently I can't figure out a case that a
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
> > checkpoint can not guarantee that you shall always get the same output
...
>
> IIRC we can checkpoint to HDFS? Then it becomes reliable.
Sure, thanks for clarify on that.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
> checkpoint can not guarantee that you shall always get the same output ...
IIRC we can checkpoint to HDFS? Then it becomes reliable.
---
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
> I see some discussion about making shuffles deterministic, but it proved
to be very difficult. Is there a prior discussion on this you can point me to?
Is it that even if you used
Github user squito commented on the issue:
https://github.com/apache/spark/pull/21698
jumping in the middle on this discussion -- everybody has raised some great
points.
1) My first takeaway from this is just how hard it can be to reason about
this, because spark's exact
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
Ah I see, then we need to change DAGScheduler a lot to fix it, which may
not worth.
I still insist that `repartition` should be treated as a bug, but it's not
easy to hit(when the input
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Actually I think @mridulm have a point here - if we only retry all the
tasks for repartition/zip*, it's still possible that some tasks in a succeeding
stage may have finished before retry, and
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
I think all we discussed here is about promise and expectation, and your
zip example seems is not an issue: according to the promise that Spark does not
guarantee output order of unsorted
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
Taking a step back and analyzing the solution for the problem at hand.
There are three main issues with the proposal:
* It does not solve the problem in a general manner.
* I
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
OK we can treat it as a data loss. However, it's not caused by spark but by
the user himself. If a user calls `zip` and then using a custom function to
compute keys from the zipped pairs, and
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@jiangxb1987 data loss comes because a re-execution of zip might generate a
key for which corresponding reducer has already finished.
Hence re-execution of stage will not result in subsequent
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@cloud-fan That depends on what the computeKey is doing - which is user
defined. It can have different values, or it need not (again, depends on user
data and closure being applied).
---
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
IIUC the output produced by `rdd1.zip(rdd2).map(v => (computeKey(v._1,
v._2), computeValue(v._1, v._2)))` shall always have the same cardinality, no
matter how many tasks are retried, so where
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
> Given this, there is no ambiguity in cardinality of zip().map() ... which
two tuples from rdd1 and rdd2 get zip'ed together can be arbitrary : and I
agree about that.
yes, but the
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@cloud-fan There is no ambiguity in output of map - one record in, one
record out.
In case of zip, as you said, number of output records is min of both.
Given this, there is no ambiguity in
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
@mridulm you provided a good example to show the indeterminacy of `zip`.
```
rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1,
v._2))).groupByKey().map().save()
```
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@jiangxb1987 Different number of output rows is due to data loss - it is
not another valid run.
A complete re-execution of the job in this case could result in a different
ordering, but
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
> A synthetic example:
> rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1,
v._2))).groupByKey().map().save()
The above example may create some different output
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@cloud-fan We should not look at a particular stage in isolation, but
rather what happens when there are failures in the middle of a job with
multiple shuffle stages - and zip is one of the
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
For `zip`, it's hard to define what result is "corrected", given the fact
that RDD is unordered. I think sample should be similar.
`repartition` is special because we may change the
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@cloud-fan The difference would be between a (user) defined record order
(global sort or local sort) and expectation of repeatable record order on
recomputation.
It might also be a good idea to
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21698
IMO RDD as a distributed data set, it should not guarantee any record order
unless you sort it. So user functions and Spark internal functions should not
expect a specific record order.
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
@jiangxb1987 Any closure sensitive to iteration order [1] is effected by
this - under the set of circumstances.
If we cannot solve it in a principled manner (make shuffle repeatable which
I
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Thank you for your comments @mridulm !
We focus on resolving the RDD.repartition() correctness issue here in this
PR, because it is most commonly used, and that we can still address the
Github user mridulm commented on the issue:
https://github.com/apache/spark/pull/21698
I did not go over the PR itself in detail, but the proposal sounds very
expensive - particularly given the cascading costs involved.
Also, I am not sure why we are special case'ing only
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92616/
Test PASSed.
---
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test PASSed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92616 has
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92616/testReport)**
for PR 21698 at commit
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test PASSed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/679/
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92616 has
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92616/testReport)**
for PR 21698 at commit
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
Thanks @cloud-fan @viirya comments addressed :)
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92527/
Test FAILed.
---
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92527 has
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92527/testReport)**
for PR 21698 at commit
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test FAILed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92527 has
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92527/testReport)**
for PR 21698 at commit
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test PASSed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/621/
Github user jiangxb1987 commented on the issue:
https://github.com/apache/spark/pull/21698
retest this please
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail:
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92526/
Test FAILed.
---
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test FAILed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92526 has
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92526/testReport)**
for PR 21698 at commit
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Merged build finished. Test PASSed.
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21698
**[Test build #92526 has
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92526/testReport)**
for PR 21698 at commit
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21698
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/620/
80 matches
Mail list logo