Spark cluster tuning recommendation

2016-07-11 Thread Kartik Mathur
I am trying a run terasort in spark , for a 7 node cluster with only 10g of
data and executors get lost with GC overhead limit exceeded error.

This is what my cluster looks like -


   - *Alive Workers:* 7
   - *Cores in use:* 28 Total, 2 Used
   - *Memory in use:* 56.0 GB Total, 1024.0 MB Used
   - *Applications:* 1 Running, 6 Completed
   - *Drivers:* 0 Running, 0 Completed
   - *Status:* ALIVE

Each worker has 8 cores and 4GB memory.

My questions is how do people running in production decide these properties
-

1) --num-executors
2) --executor-cores
3) --executor-memory
4) num of partitions
5) spark.default.parallelism

Thanks,
Kartik


Re: Spark worker abruptly dying after 2 days

2016-02-14 Thread Kartik Mathur
Yes you are right I initially started from master node but what happened
suddenly after 2 days that workers dies is what I am interested in knowing
, is it possible that workers got disconnected because of some network
issue and then they tried tried starting themselves but kept failing ?

On Sun, Feb 14, 2016 at 11:21 PM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Kartik,
>
>  Spark Workers won't start if SPARK_MASTER_IP is wrong, maybe you
> would have used start_slaves.sh from Master node to start all worker nodes,
> where Workers would have got correct SPARK_MASTER_IP initially. Later any
> restart from slave nodes would have failed because of wrong SPARK_MASTER_IP
> at worker nodes.
>
>Check the logs of other workers running to see what SPARK_MASTER_IP it
> has connected, I don't think it is using a wrong Master IP.
>
>
> Thanks,
> Prabhu Joseph
>
> On Mon, Feb 15, 2016 at 12:34 PM, Kartik Mathur <kar...@bluedata.com>
> wrote:
>
>> Thanks Prabhu ,
>>
>> I had wrongly configured spark_master_ip in worker nodes to `hostname -f`
>> which is the worker and not master ,
>>
>> but now the question is *why the cluster was up initially for 2 days*
>> and workers realized of this invalid configuration after 2 days ? And why
>> other workers are still up even through they have the same setting ?
>>
>> Really appreciate your help
>>
>> Thanks,
>> Kartik
>>
>> On Sun, Feb 14, 2016 at 10:53 PM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Kartik,
>>>
>>>The exception stack trace
>>> *java.util.concurrent.RejectedExecutionException* will happen if
>>> SPARK_MASTER_IP in worker nodes are configured wrongly like if
>>> SPARK_MASTER_IP is a hostname of Master Node and workers trying to connect
>>> to IP of master node. Check whether SPARK_MASTER_IP in Worker nodes are
>>> exactly the same as what Spark Master GUI shows.
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>> On Mon, Feb 15, 2016 at 11:51 AM, Kartik Mathur <kar...@bluedata.com>
>>> wrote:
>>>
>>>> on spark 1.5.2
>>>> I have a spark standalone cluster with 6 workers , I left the cluster
>>>> idle for 3 days and after 3 days I saw only 4 workers on the spark master
>>>> UI , 2 workers died with the same exception -
>>>>
>>>> Strange part is cluster was running stable for 2 days but on third day
>>>> 2 workers abruptly died . I am see this error in one of the affected worker
>>>> . No job ran for 2 days.
>>>>
>>>>
>>>>
>>>> 2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed!
>>>> Waiting for master to reconnect...2016-02-14 01:12:59 ERROR Worker:75 -
>>>> Connection to master failed! Waiting for master to reconnect...2016-02-14
>>>> 01:13:10 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in
>>>> thread
>>>> Thread[sparkWorker-akka.actor.default-dispatcher-2,5,main]java.util.concurrent.RejectedExecutionException:
>>>> Task java.util.concurrent.FutureTask@514b13ad rejected from
>>>> java.util.concurrent.ThreadPoolExecutor@17f8ec8d[Running, pool size =
>>>> 1, active threads = 1, queued tasks = 0, completed tasks = 3]at
>>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
>>>>at
>>>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
>>>>at
>>>> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
>>>>at
>>>> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
>>>>at
>>>> org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269)
>>>>at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
>>>>  at 
>>>> org.apache.spark.deploy.worker.Worker.org$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234)
>>>>at
>>>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521)
>>>>at 
>>>> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>>>>at
>>>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithL

Re: Spark worker abruptly dying after 2 days

2016-02-14 Thread Kartik Mathur
Thanks Prabhu ,

I had wrongly configured spark_master_ip in worker nodes to `hostname -f`
which is the worker and not master ,

but now the question is *why the cluster was up initially for 2 days* and
workers realized of this invalid configuration after 2 days ? And why other
workers are still up even through they have the same setting ?

Really appreciate your help

Thanks,
Kartik

On Sun, Feb 14, 2016 at 10:53 PM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Kartik,
>
>The exception stack trace
> *java.util.concurrent.RejectedExecutionException* will happen if
> SPARK_MASTER_IP in worker nodes are configured wrongly like if
> SPARK_MASTER_IP is a hostname of Master Node and workers trying to connect
> to IP of master node. Check whether SPARK_MASTER_IP in Worker nodes are
> exactly the same as what Spark Master GUI shows.
>
>
> Thanks,
> Prabhu Joseph
>
> On Mon, Feb 15, 2016 at 11:51 AM, Kartik Mathur <kar...@bluedata.com>
> wrote:
>
>> on spark 1.5.2
>> I have a spark standalone cluster with 6 workers , I left the cluster
>> idle for 3 days and after 3 days I saw only 4 workers on the spark master
>> UI , 2 workers died with the same exception -
>>
>> Strange part is cluster was running stable for 2 days but on third day 2
>> workers abruptly died . I am see this error in one of the affected worker .
>> No job ran for 2 days.
>>
>>
>>
>> 2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed!
>> Waiting for master to reconnect...2016-02-14 01:12:59 ERROR Worker:75 -
>> Connection to master failed! Waiting for master to reconnect...2016-02-14
>> 01:13:10 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in
>> thread
>> Thread[sparkWorker-akka.actor.default-dispatcher-2,5,main]java.util.concurrent.RejectedExecutionException:
>> Task java.util.concurrent.FutureTask@514b13ad rejected from
>> java.util.concurrent.ThreadPoolExecutor@17f8ec8d[Running, pool size = 1,
>> active threads = 1, queued tasks = 0, completed tasks = 3]at
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
>>at
>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
>>at
>> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
>>at
>> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
>>at
>> org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269)
>>at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
>>  at 
>> org.apache.spark.deploy.worker.Worker.org$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234)
>>at
>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521)
>>at 
>> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>>at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>>at 
>> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>>at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>>at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>>at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>  at
>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>at akka.actor.Actor$class.aroundReceive(Actor.scala:467)at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
>>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:487)at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

Spark worker abruptly dying after 2 days

2016-02-14 Thread Kartik Mathur
on spark 1.5.2
I have a spark standalone cluster with 6 workers , I left the cluster idle
for 3 days and after 3 days I saw only 4 workers on the spark master UI , 2
workers died with the same exception -

Strange part is cluster was running stable for 2 days but on third day 2
workers abruptly died . I am see this error in one of the affected worker .
No job ran for 2 days.



2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed! Waiting
for master to reconnect...2016-02-14 01:12:59 ERROR Worker:75 - Connection
to master failed! Waiting for master to reconnect...2016-02-14 01:13:10
ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in thread
Thread[sparkWorker-akka.actor.default-dispatcher-2,5,main]java.util.concurrent.RejectedExecutionException:
Task java.util.concurrent.FutureTask@514b13ad rejected from
java.util.concurrent.ThreadPoolExecutor@17f8ec8d[Running, pool size = 1,
active threads = 1, queued tasks = 0, completed tasks = 3]at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
   at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
   at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
   at
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
   at
org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269)
   at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
 at 
org.apache.spark.deploy.worker.Worker.org$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234)
   at
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521)
   at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
   at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
   at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
   at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
   at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
   at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
   at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
   at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
   at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
   at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)at
akka.dispatch.Mailbox.run(Mailbox.scala:220)at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
   at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



down votefavorite



Re: Huge shuffle data size

2015-10-23 Thread Kartik Mathur
Don't use groupBy , use reduceByKey instead , groupBy should always be
avoided as it leads to lot of shuffle reads/writes.

On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya 
wrote:

> Sorry i sent the wrong join code snippet, the actual snippet is
>
> ggImpsDf.join(
>aggRevenueDf,
>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>"inner")
>.select(
>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
> aggRevenueDf("metric2"))
>.coalesce(200)
>
>
> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
> wrote:
>
>> Hello,
>>
>> Data about my spark job is below. My source data is only 916MB (stage 0)
>> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
>> very long time and as i see the shuffled data is 614GB. Is this something
>> expected? Both the data sets produce 200 partitions.
>>
>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
>> ReadShuffle Write2saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:48:122.3 h
>> 200/200
>> 614.6 GB1saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:46:022.1 min
>> 8/8
>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:46:0235 s
>> 3/3
>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
>> two data sets is:
>>
>> hc.sql(query).
>> mapPartitions(iter => {
>>   iter.map {
>> case Row(
>>  ...
>>  ...
>>  ...
>> )
>>   }
>> }
>> ).toDF()
>> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
>> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>>
>>
>> Please advise on how to reduce the shuffle and speed this up.
>>
>>
>> ~Pratik
>>
>>


Re: How does shuffle work in spark ?

2015-10-20 Thread Kartik Mathur
That will depend on what is your transformation , your code snippet might
help .



On Tue, Oct 20, 2015 at 1:53 AM, shahid ashraf <sha...@trialx.com> wrote:

> Hi
>
> Any idea why is 50 GB shuffle read and write for 3.3 gb data
>
> On Mon, Oct 19, 2015 at 11:58 PM, Kartik Mathur <kar...@bluedata.com>
> wrote:
>
>> That sounds like correct shuffle output , in spark map reduce phase is
>> separated by shuffle , in map each executer writes on local disk and in
>> reduce phase reducerS reads data from each executer over the network , so
>> shuffle definitely hurts performance , for more details on spark shuffle
>> phase please read this
>>
>> http://0x0fff.com/spark-architecture-shuffle/
>>
>> Thanks
>> Kartik
>>
>> On Mon, Oct 19, 2015 at 6:54 AM, shahid <sha...@trialx.com> wrote:
>>
>>> @all i did partitionby using default hash partitioner on data
>>> [(1,data)(2,(data),(n,data)]
>>> the total data was approx 3.5 it showed shuffle write 50G and on next
>>> action
>>> e.g count it is showing shuffle read of 50 G. i don't understand this
>>> behaviour and i think the performance is getting slow with so much
>>> shuffle
>>> read on next tranformation operations.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> with Regards
> Shahid Ashraf
>


Spark Master Dying saying TimeoutException

2015-10-14 Thread Kartik Mathur
Hi,

I have some nightly jobs which runs every night but dies sometimes because
of unresponsive master , spark master logs says -

Not seeing much else there , what could possible cause an exception like
this.

*Exception in thread "main" java.util.concurrent.TimeoutException: Futures
timed out after [1 milliseconds]*

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

at scala.concurrent.Await$.result(package.scala:107)

at akka.remote.Remoting.start(Remoting.scala:180)

at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)

at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)

at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)

2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed out]
[

akka.remote.RemoteTransportException: Startup timed out

at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)

at akka.remote.Remoting.start(Remoting.scala:198)

at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)

at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)

at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)

at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)

at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)

at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)

at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)

at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)

at
org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)

at org.apache.spark.deploy.master.Master$.main(Master.scala:869)

at org.apache.spark.deploy.master.Master.main(Master.scala)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

at scala.concurrent.Await$.result(package.scala:107)

at akka.remote.Remoting.start(Remoting.scala:180)

... 17 more


Re: Spark Master Dying saying TimeoutException

2015-10-14 Thread Kartik Mathur
Retrying what ? I want to know why is it died , and what can i do to
prevent ?

On Wed, Oct 14, 2015 at 5:20 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> I fixed these timeout errors by retrying...
> On Oct 15, 2015 3:41 AM, "Kartik Mathur" <kar...@bluedata.com> wrote:
>
>> Hi,
>>
>> I have some nightly jobs which runs every night but dies sometimes
>> because of unresponsive master , spark master logs says -
>>
>> Not seeing much else there , what could possible cause an exception like
>> this.
>>
>> *Exception in thread "main" java.util.concurrent.TimeoutException:
>> Futures timed out after [1 milliseconds]*
>>
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>
>> at scala.concurrent.Await$.result(package.scala:107)
>>
>> at akka.remote.Remoting.start(Remoting.scala:180)
>>
>> at
>> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>>
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>>
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>>
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>>
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>>
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>>
>> 2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed
>> out] [
>>
>> akka.remote.RemoteTransportException: Startup timed out
>>
>> at
>> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
>>
>> at akka.remote.Remoting.start(Remoting.scala:198)
>>
>> at
>> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>>
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>>
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>>
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>>
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>>
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>>
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
>>
>> at
>> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
>>
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
>>
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>>
>> at
>> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
>>
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>
>> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
>>
>> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
>>
>> at
>> org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)
>>
>> at org.apache.spark.deploy.master.Master$.main(Master.scala:869)
>>
>> at org.apache.spark.deploy.master.Master.main(Master.scala)
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [1 milliseconds]
>>
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>
>> at scala.concurrent.Await$.result(package.scala:107)
>>
>> at akka.remote.Remoting.start(Remoting.scala:180)
>>
>> ... 17 more
>>
>>
>>


Re: DEBUG level log in receivers and executors

2015-10-12 Thread Kartik Mathur
You can create log4j.properties under your SPARK_HOME/conf and set up these
properties -

log4j.rootCategory=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.target=System.err

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n


Do it across all workers/executers.



On Mon, Oct 12, 2015 at 4:09 PM, Spark Newbie 
wrote:

> Hi Spark users,
>
> Is there an easy way to turn on DEBUG logs in receivers and executors?
> Setting sparkContext.setLogLevel seems to turn on DEBUG level only on the
> Driver.
>
> Thanks,
>


Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
ply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i assume it will read the file
> in a way that each executer will have exactly same amount of data. so there
> shouldn't be any shuffling in reading atleast.
>
> During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
> the output I am seeing
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
> SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
> (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
> 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
> I have following questions -
>
> 1) What exactly is 2.7KB of shuffle write  ?
> 2) is this 2.7 KB of shuffle write is local to that executer ?
> 3) In the executers log I am seeing 2000 bytes results sent to the driver
> , if instead this number is much much greater than 2000 byes such that it
> does not fit in executer's memory , will shuffle write increase ?
> 4)For word count 256 MB data is substantial amount text , how come the
> result for this stage is only 2000 bytes !! it should send everyword with
> respective count , for a 256 MB input this result should be much bigger ?
>
> I hope I am clear this time.
>
> Hope to get a reply,
>
> Thanks
> Kartik
>
>
>
> On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>
>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>
>
>
>
>


Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>
>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>
>
>
>


Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and "
*shuffle* *is just sending out the textFile to distribute the
partitions", *could
you please elaborate on this  ? what exactly is in this file ?

On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>


Shuffle Write v/s Shuffle Read

2015-10-01 Thread Kartik Mathur
Hi

I am trying to better understand shuffle in spark .

Based on my understanding thus far ,

*Shuffle Write* : writes stage output for intermediate stage on local disk
if memory is not sufficient.,
Example , if each worker has 200 MB memory for intermediate results and the
results are 300MB then , each executer* will keep 200 MB in memory and will
write remaining 100 MB on local disk .  *

*Shuffle Read : *Each executer will read from other executer's *memory +
disk , so total read in above case will be 300(200 from memory and 100 from
disk)*num of executers ? *

Is my understanding correct ?

Thanks,
Kartik


Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Hi Nicolae,
Thanks for the reply. To further clarify things -

sc.textFile is reading from HDFS, now shouldn't the file be read in a way
such that EACH executer works on only the local copy of file part available
, in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
partitions will be created and each task will run on  1 partition (which is
what I am seeing in the stages logs) , also i assume it will read the file
in a way that each executer will have exactly same amount of data. so there
shouldn't be any shuffling in reading atleast.

During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
the output I am seeing

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
(hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
I have following questions -

1) What exactly is 2.7KB of shuffle write  ?
2) is this 2.7 KB of shuffle write is local to that executer ?
3) In the executers log I am seeing 2000 bytes results sent to the driver ,
if instead this number is much much greater than 2000 byes such that it
does not fit in executer's memory , will shuffle write increase ?
4)For word count 256 MB data is substantial amount text , how come the
result for this stage is only 2000 bytes !! it should send everyword with
respective count , for a 256 MB input this result should be much bigger ?

I hope I am clear this time.

Hope to get a reply,

Thanks
Kartik



On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>>
>> Hi,
>>
>> 2- the end results are sent back to the driver; the shuffles are
>> transmission of intermediate results between nodes such as the -> which are
>> all intermediate transformations.
>>
>> More precisely, since flatMap and map are narrow dependencies, meaning
>> they can usually happen on the local node, I bet shuffle is just sending
>> out the textFile to a few nodes to distribute the partitions.
>>
>>
>> --
>> *From:* Kartik Mathur <kar...@bluedata.com>
>> *Sent:* Thursday, October 1, 2015 12:42 AM
>> *To:* user
>> *Subject:* Problem understanding spark word count execution
>>
>> Hi All,
>>
>> I tried running spark word count and I have couple of questions -
>>
>> I am analyzing stage 0 , i.e
>>  *sc.textFile -> flatMap -> Map (Word count example)*
>>
>> 1) In the *Stage logs* under Application UI details for every task I am
>> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
>> this task write ? like how many bytes to which executer ?*
>>
>> 2) In the executer's log when I look for same task it says 2000 bytes of
>> result is sent to driver , my question is , *if the results were
>> directly sent to driver what is this shuffle write ? *
>>
>> Thanks,
>> Kartik
>>
>
>


Problem understanding spark word count execution

2015-09-30 Thread Kartik Mathur
Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 *sc.textFile -> flatMap -> Map (Word count example)*

1) In the *Stage logs* under Application UI details for every task I am
seeing Shuffle write as 2.7 KB, *question - how can I know where all did
this task write ? like how many bytes to which executer ?*

2) In the executer's log when I look for same task it says 2000 bytes of
result is sent to driver , my question is , *if the results were directly
sent to driver what is this shuffle write ? *

Thanks,
Kartik


Re: SQL queries in Spark / YARN

2015-09-28 Thread Kartik Mathur
Hey Robert you could use Zeppelin iInstead If you don't want to use beeline
.

On Monday, September 28, 2015, Robert Grandl 
wrote:

> Thanks Mark. Do you know how ? In Spark standalone mode I use beeline to
> submit SQL scripts.
>
> In Spark/YARN, the only way I can see this will work is using
> spark-submit. However as it looks, I need to encapsulate the SQL queries in
> a Scala file. Do you have other suggestions ?
>
> Thanks,
> Robert
>
>
>
> On Monday, September 28, 2015 2:47 PM, Mark Hamstra <
> m...@clearstorydata.com
> > wrote:
>
>
> Yes.
>
> On Mon, Sep 28, 2015 at 12:46 PM, Robert Grandl  > wrote:
>
> Hi guys,
>
> I was wondering if it's possible to submit SQL queries to Spark SQL, when
> Spark is running atop YARN instead of standalone mode.
>
> Thanks,
> Robert
>
>
>
>
>


Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-28 Thread Kartik Mathur
Hey Rick ,
Not sure on this but similar situation happened with me, when starting
spark-shell it was starting a new cluster instead of using the existing
cluster and this new cluster was a single node cluster , that's why jobs
were taking forever to complete from spark-shell and were running much
faster using submit (which reads conf correctly) or zeppelin for that
matter.

Thanks,
Kartik

On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz  wrote:

> I've finally been able to pick this up again, after upgrading to Spark
> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
> (be it via Zeppelin or the shell) but won't work with spark-submit.
> With 1.4.1, I hav actually managed to get a result with the Spark shell,
> but after
> 3847,802237 seconds and in particular the last stage took 1320,672 seconds.
> This was after I used coalesce to balance the workload initiall, since a
> Hive filter I applied normally would make for a skewed distribution of the
> data onto the nodes.
> Nonetheless, the same code (even withouth the coalesce) would work much
> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
> job, the run time was just a tenth at
> 446,657534 seconds for the entire job and notably 38,961 seconds for the
> final stage.
>
> Again, there is a huge difference in the amount of data that gets
> shuffled/spilled (which leads to much earlier OOM-conditions), when using
> spark-shell.
> What could be the reason for this different behaviour using very similar
> configurations and identical data, machines and code (identical DAGs and
> sources) and identical spark binaries? Why would code launched from
> spark-shell generate more shuffled data for the same number of shuffled
> tuples?
>
> An analysis would be much appreciated.
>
> Best,
>
> Rick
>
> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz  wrote:
>
>> oops, forgot to reply-all on this thread.
>>
>> -- Forwarded message --
>> From: Rick Moritz 
>> Date: Wed, Aug 19, 2015 at 2:46 PM
>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>> Spark-shell
>> To: Igor Berman 
>>
>>
>> Those values are not explicitely set, and attempting to read their values
>> results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
>> What I mean by the volume per element being larger is illustrated in my
>> original post: for each case the number of elements is identical, but the
>> volume of data required to obtain/manage these elements is many times
>> greater.
>>
>> The only difference used to be that Zeppelin had FAIR scheduling over
>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
>> scheduling makes no difference. The only other difference in the
>> environment lies in some class-path variables which should only affect
>> method availability, not actual usage.
>>
>> Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
>> hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
>> -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
>> package) for 2.6.0 from Hortonworks, while Zeppelin was built with
>> dependencies against 2.6.0 from Maven central.
>>
>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman 
>> wrote:
>>
>>> so what your case for version differences?
>>> what do u mean by  "in spark-shell the volume per element is much
>>> larger"
>>> can you verify that configuration in spark ui (under Environment tab is
>>> same).
>>> if you suspect compression than check following properties:
>>> spark.shuffle.compress
>>> spark.shuffle.spill.compress
>>> spark.io.compression.codec
>>> spark.rdd.compress
>>>
>>>
>>>
>>> On 19 August 2015 at 15:03, Rick Moritz  wrote:
>>>
 Number of partitions and even size look relatively similar - except in
 spark-shell the volume per element is much larger, especially in later
 stages. That's when shuffles start to spill. Zeppelin creates almost no
 spills at all. The number of elements per partition are the same for both
 setups, but with very different data volume in/out. Almost as though
 compression was used in one case, and not in another, or as though
 shuffling is somehow less specific, and more nodes get data that they
 ultimately don't process at all. The same shuffling algorithm appears to be
 at work in each case, if the partitioning of the number of elements is
 anything to go by.

 On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman 
 wrote:

> i would compare spark ui metrics for both cases and see any
> differences(number of partitions, number of spills etc)
> why can't you make repl to be consistent with zepellin spark version?
>  might be rc has issues...
>
>
>
>
> On 19 August 2015 at 14:42, Rick Moritz  

Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-28 Thread Kartik Mathur
Ok, that might be possible , to confirm that you can explicitly specify the
serializer in both cases (by setting this spark.serializer i guess). So
then you can be sure that same serializers are used and may be then do an
analysis.

Best,
Kartik

On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz <rah...@gmail.com> wrote:

> Hi Kartik,
>
> Thanks for the input!
>
> Sadly, that's not it - I'm using YARN - the configuration looks identical,
> and the nodes/memory/cores are deployed identically and exactly as
> specified.
>
> My current hunch, is that for some reason different serializers are used
> in each case, but I can find no documentation on why that could be the
> case, and the configuration isn't indicative of that either.
> Nonetheless, the symptom of different shuffle volume for same shuffle
> number of tuples could well point to that as source of my issue.
> In fact, a colleague pointed out that HIS (Cloudera) installation was
> defaulting to kryo for the spark-shell, which had an impact for some jobs.
> I couldn't find the document he was referring to as a source of this
> information, but the behavior sounds plausible at least.
>
> Best,
>
> Rick
>
>
> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur <kar...@bluedata.com>
> wrote:
>
>> Hey Rick ,
>> Not sure on this but similar situation happened with me, when starting
>> spark-shell it was starting a new cluster instead of using the existing
>> cluster and this new cluster was a single node cluster , that's why jobs
>> were taking forever to complete from spark-shell and were running much
>> faster using submit (which reads conf correctly) or zeppelin for that
>> matter.
>>
>> Thanks,
>> Kartik
>>
>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote:
>>
>>> I've finally been able to pick this up again, after upgrading to Spark
>>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
>>> (be it via Zeppelin or the shell) but won't work with spark-submit.
>>> With 1.4.1, I hav actually managed to get a result with the Spark shell,
>>> but after
>>> 3847,802237 seconds and in particular the last stage took 1320,672
>>> seconds.
>>> This was after I used coalesce to balance the workload initiall, since a
>>> Hive filter I applied normally would make for a skewed distribution of the
>>> data onto the nodes.
>>> Nonetheless, the same code (even withouth the coalesce) would work much
>>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
>>> job, the run time was just a tenth at
>>> 446,657534 seconds for the entire job and notably 38,961 seconds for the
>>> final stage.
>>>
>>> Again, there is a huge difference in the amount of data that gets
>>> shuffled/spilled (which leads to much earlier OOM-conditions), when using
>>> spark-shell.
>>> What could be the reason for this different behaviour using very similar
>>> configurations and identical data, machines and code (identical DAGs and
>>> sources) and identical spark binaries? Why would code launched from
>>> spark-shell generate more shuffled data for the same number of shuffled
>>> tuples?
>>>
>>> An analysis would be much appreciated.
>>>
>>> Best,
>>>
>>> Rick
>>>
>>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote:
>>>
>>>> oops, forgot to reply-all on this thread.
>>>>
>>>> -- Forwarded message --
>>>> From: Rick Moritz <rah...@gmail.com>
>>>> Date: Wed, Aug 19, 2015 at 2:46 PM
>>>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>>>> Spark-shell
>>>> To: Igor Berman <igor.ber...@gmail.com>
>>>>
>>>>
>>>> Those values are not explicitely set, and attempting to read their
>>>> values results in 'java.util.NoSuchElementException:
>>>> spark.shuffle.spill.compress'.
>>>> What I mean by the volume per element being larger is illustrated in my
>>>> original post: for each case the number of elements is identical, but the
>>>> volume of data required to obtain/manage these elements is many times
>>>> greater.
>>>>
>>>> The only difference used to be that Zeppelin had FAIR scheduling over
>>>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
>>>> scheduling makes no difference. The only other difference in the
>>>