Database operations on executor nodes

2015-03-18 Thread Praveen Balaji
I was wondering what people generally do about doing database operations from 
executor nodes. I’m (at least for now) avoiding doing database updates from 
executor nodes to avoid proliferation of database connections on the cluster. 
The general pattern I adopt is to collect queries (or tuples) on the executors 
and write to the database on the driver.

// Executes on the executor
rdd.foreach(s => {
  val query = s"insert into  ${s}";
  accumulator += query;
});

// Executes on the driver
acclumulator.value.foreach(query => {
// get connection
// update database
});

I’m obviously trading database connections for driver heap. How do other spark 
users do it?

Cheers
Praveen
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can't get SparkListener to work

2015-04-17 Thread Praveen Balaji
I'm trying to create a simple SparkListener to get notified of error on
executors. I do not get any call backs on my SparkListener. Here some
simple code I'm executing in spark-shell. But I still don't get any
callbacks on my listener. Am I doing something wrong?

Thanks for any clue you can send my way.

Cheers
Praveen

==
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerApplicationStart
import org.apache.spark.scheduler.SparkListenerApplicationEnd
import org.apache.spark.SparkException

sc.addSparkListener(new SparkListener() {
  override def onApplicationStart(applicationStart:
SparkListenerApplicationStart) {
println(" onApplicationStart: " + applicationStart.appName);
  }

  override def onApplicationEnd(applicationEnd:
SparkListenerApplicationEnd) {
println(" onApplicationEnd: " + applicationEnd.time);
  }
});

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException("test")).collect();
===

output:

scala> org.apache.spark.SparkException: hshsh
at $iwC$$iwC$$iwC$$iwC.(:29)
at $iwC$$iwC$$iwC.(:34)
at $iwC$$iwC.(:36)
at $iwC.(:38)


Re: Can't get SparkListener to work

2015-04-17 Thread Praveen Balaji
Thanks for the response, Imran. I probably chose the wrong methods for this
email. I implemented all methods of SparkListener and the only callback I
get is onExecutorMetricsUpdate.

Here's the complete code:

==

import org.apache.spark.scheduler._

sc.addSparkListener(new SparkListener() {
  override def onStageCompleted(e: SparkListenerStageCompleted) =
println(">>>> onStageCompleted");
  override def onStageSubmitted(e: SparkListenerStageSubmitted) =
println(">>>> onStageSubmitted");
  override def onTaskStart(e: SparkListenerTaskStart) = println(">>>>
onTaskStart");
  override def onTaskGettingResult(e: SparkListenerTaskGettingResult) =
println(">>>> onTaskGettingResult");
  override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
onTaskEnd");
  override def onJobStart(e: SparkListenerJobStart) = println(">>>
onJobStart");
  override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
onJobEnd");
  override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) =
println(">>>> onEnvironmentUpdate");
  override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) =
println(">>>> onBlockManagerAdded");
  override def onBlockManagerRemoved(e:
SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
  override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
println(">>>> onUnpersistRDD");
  override def onApplicationStart(e: SparkListenerApplicationStart) =
println(">>>> onApplicationStart");
  override def onApplicationEnd(e: SparkListenerApplicationEnd) =
println(">>>> onApplicationEnd");
  override def onExecutorMetricsUpdate(e:
SparkListenerExecutorMetricsUpdate) = println(">>>>
onExecutorMetricsUpdate");
});

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException("test")).collect();

=

On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid  wrote:

> when you start the spark-shell, its already too late to get the
> ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.
>
> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
> secondorderpolynom...@gmail.com> wrote:
>
>> I'm trying to create a simple SparkListener to get notified of error on
>> executors. I do not get any call backs on my SparkListener. Here some
>> simple code I'm executing in spark-shell. But I still don't get any
>> callbacks on my listener. Am I doing something wrong?
>>
>> Thanks for any clue you can send my way.
>>
>> Cheers
>> Praveen
>>
>> ==
>> import org.apache.spark.scheduler.SparkListener
>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>> import org.apache.spark.SparkException
>>
>> sc.addSparkListener(new SparkListener() {
>>   override def onApplicationStart(applicationStart:
>> SparkListenerApplicationStart) {
>> println(">>>> onApplicationStart: " + applicationStart.appName);
>>   }
>>
>>   override def onApplicationEnd(applicationEnd:
>> SparkListenerApplicationEnd) {
>> println(">>>> onApplicationEnd: " + applicationEnd.time);
>>   }
>> });
>>
>> sc.parallelize(List(1, 2, 3)).map(throw new
>> SparkException("test")).collect();
>> ===
>>
>> output:
>>
>> scala> org.apache.spark.SparkException: hshsh
>> at $iwC$$iwC$$iwC$$iwC.(:29)
>> at $iwC$$iwC$$iwC.(:34)
>> at $iwC$$iwC.(:36)
>> at $iwC.(:38)
>>
>>
>


Re: Can't get SparkListener to work

2015-04-18 Thread Praveen Balaji
Thanks for the response, Archit. I get callbacks when I do not throw an
exception from map.
My use case, however, is to get callbacks for exceptions in transformations
on executors. Do you think I'm going down the right route?

Cheers
-p

On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur 
wrote:

> Hi Praveen,
> Can you try once removing throw exception in map. Do you still not get it.?
> On Apr 18, 2015 8:14 AM, "Praveen Balaji" 
> wrote:
>
>> Thanks for the response, Imran. I probably chose the wrong methods for
>> this email. I implemented all methods of SparkListener and the only
>> callback I get is onExecutorMetricsUpdate.
>>
>> Here's the complete code:
>>
>> ==
>>
>> import org.apache.spark.scheduler._
>>
>> sc.addSparkListener(new SparkListener() {
>>   override def onStageCompleted(e: SparkListenerStageCompleted) =
>> println(">>>> onStageCompleted");
>>   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>> println(">>>> onStageSubmitted");
>>   override def onTaskStart(e: SparkListenerTaskStart) = println(">>>>
>> onTaskStart");
>>   override def onTaskGettingResult(e: SparkListenerTaskGettingResult)
>> = println(">>>> onTaskGettingResult");
>>   override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>> onTaskEnd");
>>   override def onJobStart(e: SparkListenerJobStart) = println(">>>
>> onJobStart");
>>   override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>> onJobEnd");
>>   override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate)
>> = println(">>>> onEnvironmentUpdate");
>>   override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded)
>> = println(">>>> onBlockManagerAdded");
>>   override def onBlockManagerRemoved(e:
>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
>>   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>> println(">>>> onUnpersistRDD");
>>   override def onApplicationStart(e: SparkListenerApplicationStart) =
>> println(">>>> onApplicationStart");
>>   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>> println(">>>> onApplicationEnd");
>>   override def onExecutorMetricsUpdate(e:
>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>> onExecutorMetricsUpdate");
>> });
>>
>> sc.parallelize(List(1, 2, 3)).map(throw new
>> SparkException("test")).collect();
>>
>> =
>>
>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid 
>> wrote:
>>
>>> when you start the spark-shell, its already too late to get the
>>> ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.
>>>
>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>> secondorderpolynom...@gmail.com> wrote:
>>>
>>>> I'm trying to create a simple SparkListener to get notified of error on
>>>> executors. I do not get any call backs on my SparkListener. Here some
>>>> simple code I'm executing in spark-shell. But I still don't get any
>>>> callbacks on my listener. Am I doing something wrong?
>>>>
>>>> Thanks for any clue you can send my way.
>>>>
>>>> Cheers
>>>> Praveen
>>>>
>>>> ==
>>>> import org.apache.spark.scheduler.SparkListener
>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>>> import org.apache.spark.SparkException
>>>>
>>>> sc.addSparkListener(new SparkListener() {
>>>>   override def onApplicationStart(applicationStart:
>>>> SparkListenerApplicationStart) {
>>>> println(">>>> onApplicationStart: " + applicationStart.appName);
>>>>   }
>>>>
>>>>   override def onApplicationEnd(applicationEnd:
>>>> SparkListenerApplicationEnd) {
>>>> println(">>>> onApplicationEnd: " + applicationEnd.time);
>>>>   }
>>>> });
>>>>
>>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>>> SparkException("test")).collect();
>>>> ===
>>>>
>>>> output:
>>>>
>>>> scala> org.apache.spark.SparkException: hshsh
>>>> at $iwC$$iwC$$iwC$$iwC.(:29)
>>>> at $iwC$$iwC$$iwC.(:34)
>>>> at $iwC$$iwC.(:36)
>>>> at $iwC.(:38)
>>>>
>>>>
>>>
>>


Re: Can't get SparkListener to work

2015-04-19 Thread Praveen Balaji
Thanks Shixiong. I'll try this.

On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu  wrote:

> The problem is the code you use to test:
>
>
> sc.parallelize(List(1, 2, 3)).map(throw new
> SparkException("test")).collect();
>
> is like the following example:
>
> def foo: Int => Nothing = {
>   throw new SparkException("test")
> }
> sc.parallelize(List(1, 2, 3)).map(foo).collect();
>
> So actually the Spark jobs do not be submitted since it fails in `foo`
> that is used to create the map function.
>
> Change it to
>
> sc.parallelize(List(1, 2, 3)).map(i => throw new
> SparkException("test")).collect();
>
> And you will see the correct messages from your listener.
>
>
>
> Best Regards,
> Shixiong(Ryan) Zhu
>
> 2015-04-19 1:06 GMT+08:00 Praveen Balaji 
> :
>
>> Thanks for the response, Archit. I get callbacks when I do not throw an
>> exception from map.
>> My use case, however, is to get callbacks for exceptions in
>> transformations on executors. Do you think I'm going down the right route?
>>
>> Cheers
>> -p
>>
>> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur > > wrote:
>>
>>> Hi Praveen,
>>> Can you try once removing throw exception in map. Do you still not get
>>> it.?
>>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>>> secondorderpolynom...@gmail.com> wrote:
>>>
>>>> Thanks for the response, Imran. I probably chose the wrong methods for
>>>> this email. I implemented all methods of SparkListener and the only
>>>> callback I get is onExecutorMetricsUpdate.
>>>>
>>>> Here's the complete code:
>>>>
>>>> ==
>>>>
>>>> import org.apache.spark.scheduler._
>>>>
>>>> sc.addSparkListener(new SparkListener() {
>>>>   override def onStageCompleted(e: SparkListenerStageCompleted) =
>>>> println(">>>> onStageCompleted");
>>>>   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>>> println(">>>> onStageSubmitted");
>>>>   override def onTaskStart(e: SparkListenerTaskStart) =
>>>> println(">>>> onTaskStart");
>>>>   override def onTaskGettingResult(e:
>>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>>>   override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>>> onTaskEnd");
>>>>   override def onJobStart(e: SparkListenerJobStart) = println(">>>
>>>> onJobStart");
>>>>   override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>>> onJobEnd");
>>>>   override def onEnvironmentUpdate(e:
>>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate");
>>>>   override def onBlockManagerAdded(e:
>>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded");
>>>>   override def onBlockManagerRemoved(e:
>>>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
>>>>   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>>>> println(">>>> onUnpersistRDD");
>>>>   override def onApplicationStart(e: SparkListenerApplicationStart)
>>>> = println(">>>> onApplicationStart");
>>>>   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>>>> println(">>>> onApplicationEnd");
>>>>   override def onExecutorMetricsUpdate(e:
>>>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>>>> onExecutorMetricsUpdate");
>>>> });
>>>>
>>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>>> SparkException("test")).collect();
>>>>
>>>> =
>>>>
>>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid 
>>>> wrote:
>>>>
>>>>> when you start the spark-shell, its already too late to get the
>>>>> ApplicationStart event.  Try listening for StageCompleted or JobEnd 
>>>>> instead.
>>>>>
>>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>>>> secondorderpolynom...@gmail.com>

Re: Can't get SparkListener to work

2015-04-20 Thread Praveen Balaji
Thanks Shixiong. I tried it out and it works.

If you're looking at this post, here a few points you may be interested in:

Turns out this is has to do with the difference between methods and
function in scala - something I didn't pay attention to before. If you're
looking at this thread, this may be an interesting post:

http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html

Below is some test code. I added the Thread.sleep because it looks like
Spark notifications happen asynchronously and the main/driver thread wont
wait for the notifications to be complete. I'll look at that further later,
but for now that's my inference, so don't take my word for it yet. Here's
the code:

object TestME {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("testme");
val sc = new SparkContext(conf);
try {
  foo(sc);
} finally {
  Thread.sleep(2000);
}
  }

  def foo(sc: SparkContext) = {
sc.addSparkListener(new SparkListener() {
  override def onTaskStart(e: SparkListenerTaskStart) = println(">>>>
onTaskStart");
  override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
onTaskEnd");
});

 sc.parallelize(List(1, 2, 3)).map(i => throw new
SparkException("test")).collect();
  }
}

I'm running it from Eclipse on local[*].



On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji <
secondorderpolynom...@gmail.com> wrote:

> Thanks Shixiong. I'll try this.
>
> On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu  wrote:
>
>> The problem is the code you use to test:
>>
>>
>> sc.parallelize(List(1, 2, 3)).map(throw new
>> SparkException("test")).collect();
>>
>> is like the following example:
>>
>> def foo: Int => Nothing = {
>>   throw new SparkException("test")
>> }
>> sc.parallelize(List(1, 2, 3)).map(foo).collect();
>>
>> So actually the Spark jobs do not be submitted since it fails in `foo`
>> that is used to create the map function.
>>
>> Change it to
>>
>> sc.parallelize(List(1, 2, 3)).map(i => throw new
>> SparkException("test")).collect();
>>
>> And you will see the correct messages from your listener.
>>
>>
>>
>> Best Regards,
>> Shixiong(Ryan) Zhu
>>
>> 2015-04-19 1:06 GMT+08:00 Praveen Balaji > >:
>>
>>> Thanks for the response, Archit. I get callbacks when I do not throw an
>>> exception from map.
>>> My use case, however, is to get callbacks for exceptions in
>>> transformations on executors. Do you think I'm going down the right route?
>>>
>>> Cheers
>>> -p
>>>
>>> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <
>>> archit279tha...@gmail.com> wrote:
>>>
>>>> Hi Praveen,
>>>> Can you try once removing throw exception in map. Do you still not get
>>>> it.?
>>>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>>>> secondorderpolynom...@gmail.com> wrote:
>>>>
>>>>> Thanks for the response, Imran. I probably chose the wrong methods for
>>>>> this email. I implemented all methods of SparkListener and the only
>>>>> callback I get is onExecutorMetricsUpdate.
>>>>>
>>>>> Here's the complete code:
>>>>>
>>>>> ==
>>>>>
>>>>> import org.apache.spark.scheduler._
>>>>>
>>>>> sc.addSparkListener(new SparkListener() {
>>>>>   override def onStageCompleted(e: SparkListenerStageCompleted) =
>>>>> println(">>>> onStageCompleted");
>>>>>   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>>>> println(">>>> onStageSubmitted");
>>>>>   override def onTaskStart(e: SparkListenerTaskStart) =
>>>>> println(">>>> onTaskStart");
>>>>>   override def onTaskGettingResult(e:
>>>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>>>>   override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>>>> onTaskEnd");
>>>>>   override def onJobStart(e: SparkListenerJobStart) = println(">>>
>>>>> onJobStart");
>>>>>   override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>>>> onJobEnd");
>>>>>   overri