Re: Can't get SparkListener to work

2015-04-21 Thread Shixiong Zhu
You need to call sc.stop() to wait for the notifications to be processed.

Best Regards,
Shixiong(Ryan) Zhu

2015-04-21 4:18 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com:

 Thanks Shixiong. I tried it out and it works.

 If you're looking at this post, here a few points you may be interested in:

 Turns out this is has to do with the difference between methods and
 function in scala - something I didn't pay attention to before. If you're
 looking at this thread, this may be an interesting post:

 http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html

 Below is some test code. I added the Thread.sleep because it looks like
 Spark notifications happen asynchronously and the main/driver thread wont
 wait for the notifications to be complete. I'll look at that further later,
 but for now that's my inference, so don't take my word for it yet. Here's
 the code:

 object TestME {
   def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName(testme);
 val sc = new SparkContext(conf);
 try {
   foo(sc);
 } finally {
   Thread.sleep(2000);
 }
   }

   def foo(sc: SparkContext) = {
 sc.addSparkListener(new SparkListener() {
   override def onTaskStart(e: SparkListenerTaskStart) = println(
 onTaskStart);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
 });

  sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();
   }
 }

 I'm running it from Eclipse on local[*].



 On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks Shixiong. I'll try this.

 On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote:

 The problem is the code you use to test:


 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 is like the following example:

 def foo: Int = Nothing = {
   throw new SparkException(test)
 }
 sc.parallelize(List(1, 2, 3)).map(foo).collect();

 So actually the Spark jobs do not be submitted since it fails in `foo`
 that is used to create the map function.

 Change it to

 sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();

 And you will see the correct messages from your listener.



 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-19 1:06 GMT+08:00 Praveen Balaji 
 secondorderpolynom...@gmail.com:

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur 
 archit279tha...@gmail.com wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods
 for this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) =
 println( onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( 
 onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e:
 SparkListenerApplicationStart) = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com 

Re: Can't get SparkListener to work

2015-04-19 Thread Shixiong Zhu
The problem is the code you use to test:

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException(test)).collect();

is like the following example:

def foo: Int = Nothing = {
  throw new SparkException(test)
}
sc.parallelize(List(1, 2, 3)).map(foo).collect();

So actually the Spark jobs do not be submitted since it fails in `foo` that
is used to create the map function.

Change it to

sc.parallelize(List(1, 2, 3)).map(i = throw new
SparkException(test)).collect();

And you will see the correct messages from your listener.



Best Regards,
Shixiong(Ryan) Zhu

2015-04-19 1:06 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com:

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com
 wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods for
 this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) = println(
 onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e: SparkListenerApplicationStart)
 = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error
 on executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  +
 applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)







Re: Can't get SparkListener to work

2015-04-19 Thread Praveen Balaji
Thanks Shixiong. I'll try this.

On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote:

 The problem is the code you use to test:


 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 is like the following example:

 def foo: Int = Nothing = {
   throw new SparkException(test)
 }
 sc.parallelize(List(1, 2, 3)).map(foo).collect();

 So actually the Spark jobs do not be submitted since it fails in `foo`
 that is used to create the map function.

 Change it to

 sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();

 And you will see the correct messages from your listener.



 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-19 1:06 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com
 :

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com
  wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods for
 this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) = println(
 onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e: SparkListenerApplicationStart)
 = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error
 on executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  +
 applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)








Re: Can't get SparkListener to work

2015-04-18 Thread Praveen Balaji
Thanks for the response, Archit. I get callbacks when I do not throw an
exception from map.
My use case, however, is to get callbacks for exceptions in transformations
on executors. Do you think I'm going down the right route?

Cheers
-p

On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com
wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji secondorderpolynom...@gmail.com
 wrote:

 Thanks for the response, Imran. I probably chose the wrong methods for
 this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) = println(
 onTaskStart);
   override def onTaskGettingResult(e: SparkListenerTaskGettingResult)
 = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) = println(
 onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate)
 = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded)
 = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e: SparkListenerApplicationStart) =
 println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error on
 executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  + applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)






Re: Can't get SparkListener to work

2015-04-18 Thread Archit Thakur
Hi Praveen,
Can you try once removing throw exception in map. Do you still not get it.?
On Apr 18, 2015 8:14 AM, Praveen Balaji secondorderpolynom...@gmail.com
wrote:

 Thanks for the response, Imran. I probably chose the wrong methods for
 this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) = println(
 onTaskStart);
   override def onTaskGettingResult(e: SparkListenerTaskGettingResult)
 = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) = println(
 onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate)
 = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded)
 = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e: SparkListenerApplicationStart) =
 println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error on
 executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  + applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)






Re: Can't get SparkListener to work

2015-04-17 Thread Imran Rashid
when you start the spark-shell, its already too late to get the
ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.

On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error on
 executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  + applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)




Re: Can't get SparkListener to work

2015-04-17 Thread Praveen Balaji
Thanks for the response, Imran. I probably chose the wrong methods for this
email. I implemented all methods of SparkListener and the only callback I
get is onExecutorMetricsUpdate.

Here's the complete code:

==

import org.apache.spark.scheduler._

sc.addSparkListener(new SparkListener() {
  override def onStageCompleted(e: SparkListenerStageCompleted) =
println( onStageCompleted);
  override def onStageSubmitted(e: SparkListenerStageSubmitted) =
println( onStageSubmitted);
  override def onTaskStart(e: SparkListenerTaskStart) = println(
onTaskStart);
  override def onTaskGettingResult(e: SparkListenerTaskGettingResult) =
println( onTaskGettingResult);
  override def onTaskEnd(e: SparkListenerTaskEnd) = println(
onTaskEnd);
  override def onJobStart(e: SparkListenerJobStart) = println(
onJobStart);
  override def onJobEnd(e: SparkListenerJobEnd) = println(
onJobEnd);
  override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) =
println( onEnvironmentUpdate);
  override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) =
println( onBlockManagerAdded);
  override def onBlockManagerRemoved(e:
SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
  override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
println( onUnpersistRDD);
  override def onApplicationStart(e: SparkListenerApplicationStart) =
println( onApplicationStart);
  override def onApplicationEnd(e: SparkListenerApplicationEnd) =
println( onApplicationEnd);
  override def onExecutorMetricsUpdate(e:
SparkListenerExecutorMetricsUpdate) = println(
onExecutorMetricsUpdate);
});

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException(test)).collect();

=

On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error on
 executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  + applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)