Hi Nan,

Great digging in -- that makes sense to me for when a job is producing some
output handled by Spark like a .count or .distinct or similar.

For the other part of the question, I'm also interested in side effects
like an HDFS disk write.  If one task is writing to an HDFS path and
another task starts up, wouldn't it also attempt to write to the same path?
 How is that de-conflicted?


On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:

>  Hi, Mingyuan,
>
> According to my understanding,
>
> Spark processes the result generated from each partition by passing them
> to resultHandler (SparkContext.scala L1056)
>
> This resultHandler is usually just put the result in a driver-side array,
> the length of which is always partitions.size
>
> this design effectively ensures that the actions are idempotent
>
> e.g. the count is implemented as
>
> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>
> even the task in the partition is duplicately executed, the result put in
> the array is the same
>
>
>
> At the same time, I think the Spark implementation ensures that the
> operation applied on the return value of SparkContext.runJob will not be
> triggered when the duplicate tasks are finished
>
> Because,
>
>
> when a task is finished, the code execution path is
> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>
> in taskEnded, it will trigger the CompletionEvent message handler, where 
> DAGScheduler
> will check if (!job.finished(rt.outputid)) and rt.outputid is the
> partitionid
>
> so even the duplicate task invokes a CompletionEvent message, it will find
> job.finished(rt.outputId) has been true eventually
>
>
> Maybe I was wrong…just went through the code roughly, welcome to correct me
>
> Best,
>
>
> --
> Nan Zhu
>
> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>
> Hi all,
>
> I was curious about the details of Spark speculation. So, my understanding
> is that, when “speculated” tasks are newly scheduled on other machines, the
> original tasks are still running until the entire stage completes. This
> seems to leave some room for duplicated work because some spark actions are
> not idempotent. For example, it may be counting a partition twice in case
> of RDD.count or may be writing a partition to HDFS twice in case of
> RDD.save*(). How does it prevent this kind of duplicated work?
>
> Mingyu
>
> Attachments:
>  - smime.p7s
>
>
>

Reply via email to