Re: A couple questions about shared variables

2014-09-24 Thread Nan Zhu
I proposed a fix https://github.com/apache/spark/pull/2524  

Glad to receive feedbacks  

--  
Nan Zhu


On Tuesday, September 23, 2014 at 9:06 PM, Sandy Ryza wrote:

 Filed https://issues.apache.org/jira/browse/SPARK-3642 for documenting these 
 nuances.
  
 -Sandy
  
 On Mon, Sep 22, 2014 at 10:36 AM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  I see, thanks for pointing this out  
   
   
  --  
  Nan Zhu
   
   
  On Monday, September 22, 2014 at 12:08 PM, Sandy Ryza wrote:
   
   MapReduce counters do not count duplications.  In MapReduce, if a task 
   needs to be re-run, the value of the counter from the second task 
   overwrites the value from the first task.

   -Sandy

   On Mon, Sep 22, 2014 at 4:55 AM, Nan Zhu zhunanmcg...@gmail.com 
   (mailto:zhunanmcg...@gmail.com) wrote:
If you think it as necessary to fix, I would like to resubmit that PR 
(seems to have some conflicts with the current DAGScheduler)  
 
My suggestion is to make it as an option in accumulator, e.g. some 
algorithms utilizing accumulator for result calculation, it needs a 
deterministic accumulator, while others implementing something like 
Hadoop counters may need the current implementation (count everything 
happened, including the duplications)
 
Your thoughts?  
 
--  
Nan Zhu
 
 
On Sunday, September 21, 2014 at 6:35 PM, Matei Zaharia wrote:
 
 Hmm, good point, this seems to have been broken by refactorings of 
 the scheduler, but it worked in the past. Basically the solution is 
 simple -- in a result stage, we should not apply the update for each 
 task ID more than once -- the same way we don't call 
 job.listener.taskSucceeded more than once. Your PR also tried to 
 avoid this for resubmitted shuffle stages, but I don't think we need 
 to do that necessarily (though we could).
  
 Matei  
  
 On September 21, 2014 at 1:11:13 PM, Nan Zhu (zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com)) wrote:
  
  Hi, Matei,  
   
  Can you give some hint on how the current implementation guarantee 
  the accumulator is only applied for once?  
   
  There is a pending PR trying to achieving this 
  (https://github.com/apache/spark/pull/228/files), but from the 
  current implementation, I didn’t see this has been done? (maybe I 
  missed something)  
   
  Best,  
   
  --   
  Nan Zhu
   
   
  On Sunday, September 21, 2014 at 1:10 AM, Matei Zaharia wrote:
   
   Hey Sandy,

   On September 20, 2014 at 8:50:54 AM, Sandy Ryza 
   (sandy.r...@cloudera.com (mailto:sandy.r...@cloudera.com)) wrote: 


   Hey All,   

   A couple questions came up about shared variables recently, and I 
   wanted to   
   confirm my understanding and update the doc to be a little more 
   clear.  

   *Broadcast variables*   
   Now that tasks data is automatically broadcast, the only 
   occasions where it  
   makes sense to explicitly broadcast are:  
   * You want to use a variable from tasks in multiple stages.  
   * You want to have the variable stored on the executors in 
   deserialized  
   form.  
   * You want tasks to be able to modify the variable and have those 

   modifications take effect for other tasks running on the same 
   executor  
   (usually a very bad idea).  

   Is that right?   
   Yeah, pretty much. Reason 1 above is probably the biggest, but 2 
   also matters. (We might later factor tasks in a different way to 
   avoid 2, but it's hard due to things like Hadoop JobConf objects 
   in the tasks).


   *Accumulators*   
   Values are only counted for successful tasks. Is that right? 
   KMeans seems  
   to use it in this way. What happens if a node goes away and 
   successful  
   tasks need to be resubmitted? Or the stage runs again because a 
   different  
   job needed it.  
   Accumulators are guaranteed to give a deterministic result if you 
   only increment them in actions. For each result stage, the 
   accumulator's update from each task is only applied once, even if 
   that task runs multiple times. If you use accumulators in 
   transformations (i.e. in a stage that may be part of multiple 
   jobs), then you may see multiple updates, from each run. This is 
   kind of confusing but it was useful for people who wanted to use 
   these for debugging.

   Matei  





   thanks,   
   Sandy  



   
   
 

   
  



Re: A couple questions about shared variables

2014-09-22 Thread Nan Zhu
If you think it as necessary to fix, I would like to resubmit that PR (seems to 
have some conflicts with the current DAGScheduler)  

My suggestion is to make it as an option in accumulator, e.g. some algorithms 
utilizing accumulator for result calculation, it needs a deterministic 
accumulator, while others implementing something like Hadoop counters may need 
the current implementation (count everything happened, including the 
duplications)

Your thoughts?  

--  
Nan Zhu


On Sunday, September 21, 2014 at 6:35 PM, Matei Zaharia wrote:

 Hmm, good point, this seems to have been broken by refactorings of the 
 scheduler, but it worked in the past. Basically the solution is simple -- in 
 a result stage, we should not apply the update for each task ID more than 
 once -- the same way we don't call job.listener.taskSucceeded more than once. 
 Your PR also tried to avoid this for resubmitted shuffle stages, but I don't 
 think we need to do that necessarily (though we could).
  
 Matei  
  
 On September 21, 2014 at 1:11:13 PM, Nan Zhu (zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com)) wrote:
  
  Hi, Matei,  
   
  Can you give some hint on how the current implementation guarantee the 
  accumulator is only applied for once?  
   
  There is a pending PR trying to achieving this 
  (https://github.com/apache/spark/pull/228/files), but from the current 
  implementation, I didn’t see this has been done? (maybe I missed something) 
   
   
  Best,  
   
  --   
  Nan Zhu
   
   
  On Sunday, September 21, 2014 at 1:10 AM, Matei Zaharia wrote:
   
   Hey Sandy,

   On September 20, 2014 at 8:50:54 AM, Sandy Ryza (sandy.r...@cloudera.com 
   (mailto:sandy.r...@cloudera.com)) wrote:  

   Hey All,   

   A couple questions came up about shared variables recently, and I wanted 
   to   
   confirm my understanding and update the doc to be a little more clear.  

   *Broadcast variables*   
   Now that tasks data is automatically broadcast, the only occasions where 
   it  
   makes sense to explicitly broadcast are:  
   * You want to use a variable from tasks in multiple stages.  
   * You want to have the variable stored on the executors in deserialized  
   form.  
   * You want tasks to be able to modify the variable and have those  
   modifications take effect for other tasks running on the same executor  
   (usually a very bad idea).  

   Is that right?   
   Yeah, pretty much. Reason 1 above is probably the biggest, but 2 also 
   matters. (We might later factor tasks in a different way to avoid 2, but 
   it's hard due to things like Hadoop JobConf objects in the tasks).


   *Accumulators*   
   Values are only counted for successful tasks. Is that right? KMeans seems 

   to use it in this way. What happens if a node goes away and successful  
   tasks need to be resubmitted? Or the stage runs again because a different 

   job needed it.  
   Accumulators are guaranteed to give a deterministic result if you only 
   increment them in actions. For each result stage, the accumulator's 
   update from each task is only applied once, even if that task runs 
   multiple times. If you use accumulators in transformations (i.e. in a 
   stage that may be part of multiple jobs), then you may see multiple 
   updates, from each run. This is kind of confusing but it was useful for 
   people who wanted to use these for debugging.

   Matei  





   thanks,   
   Sandy  



   
   



Re: A couple questions about shared variables

2014-09-22 Thread Sandy Ryza
MapReduce counters do not count duplications.  In MapReduce, if a task
needs to be re-run, the value of the counter from the second task
overwrites the value from the first task.

-Sandy

On Mon, Sep 22, 2014 at 4:55 AM, Nan Zhu zhunanmcg...@gmail.com wrote:

  If you think it as necessary to fix, I would like to resubmit that PR
 (seems to have some conflicts with the current DAGScheduler)

 My suggestion is to make it as an option in accumulator, e.g. some
 algorithms utilizing accumulator for result calculation, it needs a
 deterministic accumulator, while others implementing something like Hadoop
 counters may need the current implementation (count everything happened,
 including the duplications)

 Your thoughts?

 --
 Nan Zhu

 On Sunday, September 21, 2014 at 6:35 PM, Matei Zaharia wrote:

 Hmm, good point, this seems to have been broken by refactorings of the
 scheduler, but it worked in the past. Basically the solution is simple --
 in a result stage, we should not apply the update for each task ID more
 than once -- the same way we don't call job.listener.taskSucceeded more
 than once. Your PR also tried to avoid this for resubmitted shuffle stages,
 but I don't think we need to do that necessarily (though we could).

 Matei

 On September 21, 2014 at 1:11:13 PM, Nan Zhu (zhunanmcg...@gmail.com)
 wrote:

 Hi, Matei,

 Can you give some hint on how the current implementation guarantee the
 accumulator is only applied for once?

 There is a pending PR trying to achieving this (
 https://github.com/apache/spark/pull/228/files), but from the current
 implementation, I didn’t see this has been done? (maybe I missed something)

 Best,

 --
 Nan Zhu

 On Sunday, September 21, 2014 at 1:10 AM, Matei Zaharia wrote:

  Hey Sandy,

 On September 20, 2014 at 8:50:54 AM, Sandy Ryza (sandy.r...@cloudera.com)
 wrote:

 Hey All,

 A couple questions came up about shared variables recently, and I wanted
 to
 confirm my understanding and update the doc to be a little more clear.

 *Broadcast variables*
 Now that tasks data is automatically broadcast, the only occasions where
 it
 makes sense to explicitly broadcast are:
 * You want to use a variable from tasks in multiple stages.
 * You want to have the variable stored on the executors in deserialized
 form.
 * You want tasks to be able to modify the variable and have those
 modifications take effect for other tasks running on the same executor
 (usually a very bad idea).

 Is that right?
 Yeah, pretty much. Reason 1 above is probably the biggest, but 2 also
 matters. (We might later factor tasks in a different way to avoid 2, but
 it's hard due to things like Hadoop JobConf objects in the tasks).


 *Accumulators*
 Values are only counted for successful tasks. Is that right? KMeans seems
 to use it in this way. What happens if a node goes away and successful
 tasks need to be resubmitted? Or the stage runs again because a different
 job needed it.
 Accumulators are guaranteed to give a deterministic result if you only
 increment them in actions. For each result stage, the accumulator's update
 from each task is only applied once, even if that task runs multiple times.
 If you use accumulators in transformations (i.e. in a stage that may be
 part of multiple jobs), then you may see multiple updates, from each run.
 This is kind of confusing but it was useful for people who wanted to use
 these for debugging.

 Matei





 thanks,
 Sandy






Re: A couple questions about shared variables

2014-09-22 Thread Nan Zhu
I see, thanks for pointing this out  


--  
Nan Zhu


On Monday, September 22, 2014 at 12:08 PM, Sandy Ryza wrote:

 MapReduce counters do not count duplications.  In MapReduce, if a task needs 
 to be re-run, the value of the counter from the second task overwrites the 
 value from the first task.
  
 -Sandy
  
 On Mon, Sep 22, 2014 at 4:55 AM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  If you think it as necessary to fix, I would like to resubmit that PR 
  (seems to have some conflicts with the current DAGScheduler)  
   
  My suggestion is to make it as an option in accumulator, e.g. some 
  algorithms utilizing accumulator for result calculation, it needs a 
  deterministic accumulator, while others implementing something like Hadoop 
  counters may need the current implementation (count everything happened, 
  including the duplications)
   
  Your thoughts?  
   
  --  
  Nan Zhu
   
   
  On Sunday, September 21, 2014 at 6:35 PM, Matei Zaharia wrote:
   
   Hmm, good point, this seems to have been broken by refactorings of the 
   scheduler, but it worked in the past. Basically the solution is simple -- 
   in a result stage, we should not apply the update for each task ID more 
   than once -- the same way we don't call job.listener.taskSucceeded more 
   than once. Your PR also tried to avoid this for resubmitted shuffle 
   stages, but I don't think we need to do that necessarily (though we 
   could).

   Matei  

   On September 21, 2014 at 1:11:13 PM, Nan Zhu (zhunanmcg...@gmail.com 
   (mailto:zhunanmcg...@gmail.com)) wrote:

Hi, Matei,  
 
Can you give some hint on how the current implementation guarantee the 
accumulator is only applied for once?  
 
There is a pending PR trying to achieving this 
(https://github.com/apache/spark/pull/228/files), but from the current 
implementation, I didn’t see this has been done? (maybe I missed 
something)  
 
Best,  
 
--   
Nan Zhu
 
 
On Sunday, September 21, 2014 at 1:10 AM, Matei Zaharia wrote:
 
 Hey Sandy,
  
 On September 20, 2014 at 8:50:54 AM, Sandy Ryza 
 (sandy.r...@cloudera.com (mailto:sandy.r...@cloudera.com)) wrote:  
  
 Hey All,   
  
 A couple questions came up about shared variables recently, and I 
 wanted to   
 confirm my understanding and update the doc to be a little more 
 clear.  
  
 *Broadcast variables*   
 Now that tasks data is automatically broadcast, the only occasions 
 where it  
 makes sense to explicitly broadcast are:  
 * You want to use a variable from tasks in multiple stages.  
 * You want to have the variable stored on the executors in 
 deserialized  
 form.  
 * You want tasks to be able to modify the variable and have those  
 modifications take effect for other tasks running on the same 
 executor  
 (usually a very bad idea).  
  
 Is that right?   
 Yeah, pretty much. Reason 1 above is probably the biggest, but 2 also 
 matters. (We might later factor tasks in a different way to avoid 2, 
 but it's hard due to things like Hadoop JobConf objects in the tasks).
  
  
 *Accumulators*   
 Values are only counted for successful tasks. Is that right? KMeans 
 seems  
 to use it in this way. What happens if a node goes away and 
 successful  
 tasks need to be resubmitted? Or the stage runs again because a 
 different  
 job needed it.  
 Accumulators are guaranteed to give a deterministic result if you 
 only increment them in actions. For each result stage, the 
 accumulator's update from each task is only applied once, even if 
 that task runs multiple times. If you use accumulators in 
 transformations (i.e. in a stage that may be part of multiple jobs), 
 then you may see multiple updates, from each run. This is kind of 
 confusing but it was useful for people who wanted to use these for 
 debugging.
  
 Matei  
  
  
  
  
  
 thanks,   
 Sandy  
  
  
  
 
 
   
  



Re: A couple questions about shared variables

2014-09-21 Thread Matei Zaharia
Hmm, good point, this seems to have been broken by refactorings of the 
scheduler, but it worked in the past. Basically the solution is simple -- in a 
result stage, we should not apply the update for each task ID more than once -- 
the same way we don't call job.listener.taskSucceeded more than once. Your PR 
also tried to avoid this for resubmitted shuffle stages, but I don't think we 
need to do that necessarily (though we could).

Matei

On September 21, 2014 at 1:11:13 PM, Nan Zhu (zhunanmcg...@gmail.com) wrote:

Hi, Matei, 

Can you give some hint on how the current implementation guarantee the 
accumulator is only applied for once?

There is a pending PR trying to achieving this 
(https://github.com/apache/spark/pull/228/files), but from the current 
implementation, I didn’t see this has been done? (maybe I missed something)

Best,

-- 
Nan Zhu
On Sunday, September 21, 2014 at 1:10 AM, Matei Zaharia wrote:

Hey Sandy,

On September 20, 2014 at 8:50:54 AM, Sandy Ryza (sandy.r...@cloudera.com) wrote:

Hey All, 

A couple questions came up about shared variables recently, and I wanted to 
confirm my understanding and update the doc to be a little more clear. 

*Broadcast variables* 
Now that tasks data is automatically broadcast, the only occasions where it 
makes sense to explicitly broadcast are: 
* You want to use a variable from tasks in multiple stages. 
* You want to have the variable stored on the executors in deserialized 
form. 
* You want tasks to be able to modify the variable and have those 
modifications take effect for other tasks running on the same executor 
(usually a very bad idea). 

Is that right? 
Yeah, pretty much. Reason 1 above is probably the biggest, but 2 also matters. 
(We might later factor tasks in a different way to avoid 2, but it's hard due 
to things like Hadoop JobConf objects in the tasks).


*Accumulators* 
Values are only counted for successful tasks. Is that right? KMeans seems 
to use it in this way. What happens if a node goes away and successful 
tasks need to be resubmitted? Or the stage runs again because a different 
job needed it. 
Accumulators are guaranteed to give a deterministic result if you only 
increment them in actions. For each result stage, the accumulator's update from 
each task is only applied once, even if that task runs multiple times. If you 
use accumulators in transformations (i.e. in a stage that may be part of 
multiple jobs), then you may see multiple updates, from each run. This is kind 
of confusing but it was useful for people who wanted to use these for debugging.

Matei





thanks, 
Sandy 



Re: A couple questions about shared variables

2014-09-20 Thread Matei Zaharia
Hey Sandy,

On September 20, 2014 at 8:50:54 AM, Sandy Ryza (sandy.r...@cloudera.com) wrote:

Hey All, 

A couple questions came up about shared variables recently, and I wanted to 
confirm my understanding and update the doc to be a little more clear. 

*Broadcast variables* 
Now that tasks data is automatically broadcast, the only occasions where it 
makes sense to explicitly broadcast are: 
* You want to use a variable from tasks in multiple stages. 
* You want to have the variable stored on the executors in deserialized 
form. 
* You want tasks to be able to modify the variable and have those 
modifications take effect for other tasks running on the same executor 
(usually a very bad idea). 

Is that right? 
Yeah, pretty much. Reason 1 above is probably the biggest, but 2 also matters. 
(We might later factor tasks in a different way to avoid 2, but it's hard due 
to things like Hadoop JobConf objects in the tasks).


*Accumulators* 
Values are only counted for successful tasks. Is that right? KMeans seems 
to use it in this way. What happens if a node goes away and successful 
tasks need to be resubmitted? Or the stage runs again because a different 
job needed it. 
Accumulators are guaranteed to give a deterministic result if you only 
increment them in actions. For each result stage, the accumulator's update from 
each task is only applied once, even if that task runs multiple times. If you 
use accumulators in transformations (i.e. in a stage that may be part of 
multiple jobs), then you may see multiple updates, from each run. This is kind 
of confusing but it was useful for people who wanted to use these for debugging.

Matei





thanks, 
Sandy