Github user rezasafi commented on the issue:
https://github.com/apache/spark/pull/19388
Thank you very much, @mridulm for the comment. Yeah, I agree that the
provided solution is complex. The complexity came form the fact that in this
solution both executors and the driver use a jobId that is based on the
stageId. I tried a simpler solution in October that uses rddId as a unified Id
between drivers and executors, but that solution failed since committer was
expecting a stageId as the jobId. Let me discuss this with more details.
In the current code there is a call to commitTask (the taskContext here is
created by using stageID as the jobId):
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala#L134
That call will ended up reaching here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L172
This will follows to the following point:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala#L73
Which will call canCommit and canCommit will check whether the jobId is an
actual stageID:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L89
If we want to use a simpler solution for creating the jobId based on a
monotonically increasing value then we need to make some changes in other parts
of the code to remove the need for using a stageId. I need to check whether
this will actually create a simpler fix (especially with regards to
SPARK-4879).
I will update here as soon as I finished investigating this. Thank you
again.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]