Re: executor failures w/ scala 2.10

2013-10-29 Thread Prashant Sharma
Those log messages are new to the Akka 2.2 and are usually seen when a node
is disassociated with other by either a network failure or even clean
shutdown. This suggests some network issue to me, are you running on EC2 ?
It might be a temporary thing in that case.

I had like to have more details on the long jobs though, how long ?


On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid  wrote:

> We've been testing out the 2.10 branch of spark, and we're running into
> some issues were akka disconnects from the executors after a while.  We ran
> some simple tests first, and all was well, so we started upgrading our
> whole codebase to 2.10.  Everything seemed to be working, but then we
> noticed that when we run long jobs, and then things start failing.
>
>
> The first suspicious thing is that we get akka warnings about
> undeliverable messages sent to deadLetters:
>
> 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO
> akka.actor.LocalActorRef - Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://spark/deadLetters] to
> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
> was not delivered. [4] dead letters encountered. This logging can be turned
> off or adjusted with configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO
> akka.actor.LocalActorRef - Message
> [akka.remote.transport.AssociationHandle$Disassociated] from
> Actor[akka://spark/deadLetters] to
> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
> was not delivered. [5] dead letters encountered. This logging can be turned
> off or adjusted with configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
>
>
> Generally within a few seconds after the first such message, there are a
> bunch more, and then the executor is marked as failed, and a new one is
> started:
>
> 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO
> akka.actor.LocalActorRef - Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://spark/deadLetters] to
> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
> 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead
> letters encountered, no more dead letters will be logged. This logging can
> be turned off or adjusted with configuration settings
> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>
> 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO
> org.apache.spark.deploy.client.Client$ClientActor - Executor updated:
> app-2013102911-/1 is now FAILED (Command exited with code 1)
>
> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO
> org.apache.spark.deploy.client.Client$ClientActor - Executor added:
> app-2013102911-/2 on
> worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544)
> with 24 cores
>
> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR
> akka.remote.EndpointWriter - AssociationError [akka.tcp://
> sp...@ddd0.quantifind.com:43068] -> [akka.tcp://
> sparkexecu...@dhd2.quantifind.com:45794]: Error [Association failed with
> [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>
>
>
> Looking in the logs of the failed executor, there are some similar
> messages about undeliverable messages, but I don't see any reason:
>
> 13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943
>
> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer]
> from Actor[akka://sparkExecutor/deadLetters] to
> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead
> letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
> [akka.remote.transport.AssociationHandle$Disassociated] from
> Actor[akka://sparkExecutor/deadLetters] to
> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead
> letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
>

RE: spark-0.8.0 and hadoop-2.1.0-beta

2013-10-29 Thread Liu, Raymond
I am also working on porting the trunk code onto 2.2.0. Seems quite many API 
changes but many of them are just a rename work.
While Yarn 2.1.0 beta also add some client API for easy interaction with YARN 
framework, but there are not many examples on how to use them ( API and wiki 
doc are both old and not reflecting the new API), some part of SPARK YARN code 
will need to be rewritten with the new client API
And I am not quite familiar with the user certification part of code, it might 
take times for it seems to me this part of codes also change a little bit, some 
methods gone, and I don't find the replacement or they are not need anymore.


Best Regards,
Raymond Liu

From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Wednesday, October 30, 2013 2:35 AM
To: user@spark.incubator.apache.org
Subject: Re: spark-0.8.0 and hadoop-2.1.0-beta

I'm curious, Viren, do you have a patch you could post to build this against 
YARN 2.1 / 2.2? It would be nice to see how big the changes are.

Matei

On Sep 30, 2013, at 10:14 AM, viren kumar  wrote:


I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by 
following some of the changes described here: 
http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/
That should help you build most of it. One change not covered there is the 
change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to 
ConverterUtils.convertFromYarn(containerToken, cmAddress).
Not 100% sure that my changes are correct. 
Hope that helps,
Viren

On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia  wrote:
Hi Terence,

YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest 
sticking with 2.0.x for now. We may create a different branch for this version. 
Unfortunately due to the API change it may not be possible to support this 
version while also supporting other widely-used versions like 0.23.x.

Matei

On Sep 29, 2013, at 11:00 AM, Terance Dias  wrote:

>
> Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta.
> I have changed the following properties in SparkBuild.scala file.
>
> val DEFAULT_HADOOP_VERSION = "2.1.0-beta"
> val DEFAULT_YARN = true
>
> when i do sbt clean compile, I get an error saying
>
> [error] 
> /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42:
>  not found: type AMRMProtocol
> [error]   private var resourceManager: AMRMProtocol = null
>
> Thanks,
> Terance.
>




Re: Spark cluster memory configuration for spark-shell

2013-10-29 Thread Aaron Davidson
You are correct. If you are just using spark-shell in local mode (i.e.,
without cluster), you can set the SPARK_MEM environment variable to give
the driver more memory. E.g.:
SPARK_MEM=24g ./spark-shell

Otherwise, if you're using a real cluster, the driver shouldn't require a
significant amount of memory, so SPARK_MEM should not have to be used.


On Tue, Oct 29, 2013 at 12:40 PM, Soumya Simanta
wrote:

> I'm new to Spark. I want to try out a few simple example from the Spark
> shell. However, I'm not sure how to configure it so that I can make the
> max. use of memory on my workers.
>
> On average I've around 48 GB of RAM on each node on my cluster. I've
> around 10 nodes.
>
> Based on the documentation I could find memory based configuration in two
> places.
>
> *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh *
>
> *SPARK_WORKER_MEMORY* Total amount of memory to allow Spark applications
> to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB);
> note that each application's *individual* memory is configured using its
> spark.executor.memory property.
> *2. spark.executor.memory JVM flag. *
>  spark.executor.memory512m Amount of memory to use per executor process,
> in the same format as JVM memory strings (e.g. 512m, 2g).
>
> http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties
>
> In my case I want to use the max. memory possible on each node. My
> understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I
> will have to increase spark.executor.memory to something big (e.g., 24g or
> 32g). Is this correct? If yes, what is the correct way of setting this
> property if I just want to use the spark-shell.
>
>
> Thanks.
> -Soumya
>
>


Re: Getting exception org.apache.spark.SparkException: Job aborted: Task 1.0:37 failed more than 4 times

2013-10-29 Thread Sergey Soldatov
You may check 'out' files in logs directory for the failure details.


On Wed, Oct 30, 2013 at 12:17 AM, Soumya Simanta
wrote:

> I'm using a pretty recent version of Spark (> 0.8) from Github and it's
> failing with the following exception for a very simple task on the
> spark-shell.
>
>
> *scala> val file = sc.textFile("hdfs://...")*
> file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
> :12
>
> *scala> val errors = file.filter(line => line.contains("sometext"))*
> errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at
> :14
>
> *scala> errors.count()*
> org.apache.spark.SparkException: Job aborted: Task 0.0:32 failed more than
> 4 times
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:819)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:817)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:817)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:432)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:494)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
>


Spark Checkpointing Bug

2013-10-29 Thread Craig Vanderborgh
I have disabled checkpointing in my Spark Streaming application with:

ssc.checkpoint(null)

This application does not need checkpointing, and there is no operation
that I'm doing (only filter operations, and, importantly, NO WINDOWING)
that should need checkpointing.  And yet I am getting this:

2013-10-29 14:47:22 ERROR LocalScheduler - Exception in task 3
java.lang.Exception: Could not compute split, block input-0-1383076026200
not found
at spark.rdd.BlockRDD.compute(BlockRDD.scala:32)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.FilteredRDD.compute(FilteredRDD.scala:15)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.UnionPartition.iterator(UnionRDD.scala:12)
at spark.rdd.UnionRDD.compute(UnionRDD.scala:52)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
at
spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:76)
at
spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:49)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Why might I be getting this exception?

Thanks,
Craig Vanderborgh


Re: met a problem while running a streaming example program

2013-10-29 Thread dachuan
yes, it works after checkout branch-0.8.

thanks.


On Tue, Oct 29, 2013 at 12:51 PM, Patrick Wendell wrote:

> If you just add the "extends Serializable" changes from here it should
> work.
>
> On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell 
> wrote:
> > This was fixed on 0.8 branch and master:
> > https://github.com/apache/incubator-spark/pull/63/files
> >
> > - Patrick
> >
> > On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges
> >  wrote:
> >> I vaguely remember running into this same error. It says there
> >> "java.io.NotSerializableException:
> >> org.apache.spark.streaming.examples.clickstream.PageView"... can you
> >> check the PageView class in the examples and make sure it has the
> >> @serializable directive? I seem to remember having to add it.
> >>
> >> good luck,
> >> Thunder
> >>
> >>
> >> On Tue, Oct 29, 2013 at 6:54 AM, dachuan  wrote:
> >>> Hi,
> >>>
> >>> I have tried the clickstream example, it runs into an exception,
> anybody met
> >>> this before?
> >>>
> >>> Since the program mentioned "local[2]", so I run it in my local
> machine.
> >>>
> >>> thanks in advance,
> >>> dachuan.
> >>>
> >>> Log Snippet 1:
> >>>
> >>> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing
> tasks
> >>> from Stage 12 (MapPartitionsRDD[63] at combineByKey at
> >>> ShuffledDStream.scala:41)
> >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is
> 4230
> >>> bytes
> >>> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
> >>> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
> >>> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
> >>> org.apache.spark.rdd.BlockRDDPartition@0
> >>> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0
> failed
> >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
> >>> java.io.NotSerializableException
> >>> java.io.NotSerializableException:
> >>> org.apache.spark.streaming.examples.clickstream.PageView
> >>>
> >>> Log Snippet 2:
> >>> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more
> than 4
> >>> times; aborting job java.io.NotSerializableException:
> >>> org.apache.spark.streaming.examples.clickstream.PageView
> >>> at
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >>> at
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >>> at
> >>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >>> at
> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>> at
> >>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >>> at
> >>>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
> >>> at
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >>> at
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >>>
> >>> Two commands that run this app:
> >>> ./run-example
> >>> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4
> 10
> >>> ./run-example
> org.apache.spark.streaming.examples.clickstream.PageViewStream
> >>> errorRatePerZipCode localhost 4
> >>>
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210


Spark v0.7.3: spark.SparkException Bug

2013-10-29 Thread Craig Vanderborgh
This is a bug report on an internal Spark problem.  This exception results
in the death of the task in which it arises, but somehow Spark seems to
recover and soldier on.

Here is the exception, which is completely outside of any user level code:

java.lang.NullPointerException
2013-10-29 14:38:20 ERROR JobManager - Running streaming job 2425 @
138307910 ms failed
spark.SparkException: Job failed: ShuffleMapTask(3846, 1) failed:
ExceptionFailure(java.lang.NullPointerException,java.lang.NullPointerException,[Ljava.lang.StackTraceElement;@4d5c3086
)
at
spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at
spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at
spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300)
at
spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
2013-10-29 14:38:20 ERROR LocalScheduler - Exception in task 1

Thanks,
Craig Vanderborgh


Getting exception org.apache.spark.SparkException: Job aborted: Task 1.0:37 failed more than 4 times

2013-10-29 Thread Soumya Simanta
I'm using a pretty recent version of Spark (> 0.8) from Github and it's
failing with the following exception for a very simple task on the
spark-shell.


*scala> val file = sc.textFile("hdfs://...")*
file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
:12

*scala> val errors = file.filter(line => line.contains("sometext"))*
errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at
:14

*scala> errors.count()*
org.apache.spark.SparkException: Job aborted: Task 0.0:32 failed more than
4 times
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:819)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:817)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:817)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:432)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:494)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)


executor failures w/ scala 2.10

2013-10-29 Thread Imran Rashid
We've been testing out the 2.10 branch of spark, and we're running into
some issues were akka disconnects from the executors after a while.  We ran
some simple tests first, and all was well, so we started upgrading our
whole codebase to 2.10.  Everything seemed to be working, but then we
noticed that when we run long jobs, and then things start failing.


The first suspicious thing is that we get akka warnings about undeliverable
messages sent to deadLetters:

22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
was not delivered. [4] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
was not delivered. [5] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.



Generally within a few seconds after the first such message, there are a
bunch more, and then the executor is marked as failed, and a new one is
started:

2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead
letters encountered, no more dead letters will be logged. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO
org.apache.spark.deploy.client.Client$ClientActor - Executor updated:
app-2013102911-/1 is now FAILED (Command exited with code 1)

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO
org.apache.spark.deploy.client.Client$ClientActor - Executor added:
app-2013102911-/2 on
worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544)
with 24 cores

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR
akka.remote.EndpointWriter - AssociationError [akka.tcp://
sp...@ddd0.quantifind.com:43068] -> [akka.tcp://
sparkexecu...@dhd2.quantifind.com:45794]: Error [Association failed with
[akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: dhd2.quantifind.com/10.10.5.64:45794]



Looking in the logs of the failed executor, there are some similar messages
about undeliverable messages, but I don't see any reason:

13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer]
from Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://sparkExecutor/deadLetters] to
Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3] dead
letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: D

Spark cluster memory configuration for spark-shell

2013-10-29 Thread Soumya Simanta
I'm new to Spark. I want to try out a few simple example from the Spark
shell. However, I'm not sure how to configure it so that I can make the
max. use of memory on my workers.

On average I've around 48 GB of RAM on each node on my cluster. I've around
10 nodes.

Based on the documentation I could find memory based configuration in two
places.

*1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh *

*SPARK_WORKER_MEMORY*Total amount of memory to allow Spark applications to
use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note
that each application's *individual* memory is configured using its
spark.executor.memory property.
*2. spark.executor.memory JVM flag. *
spark.executor.memory512mAmount of memory to use per executor process, in
the same format as JVM memory strings (e.g. 512m, 2g).
http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties

In my case I want to use the max. memory possible on each node. My
understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I
will have to increase spark.executor.memory to something big (e.g., 24g or
32g). Is this correct? If yes, what is the correct way of setting this
property if I just want to use the spark-shell.


Thanks.
-Soumya


Re: spark-0.8.0 and hadoop-2.1.0-beta

2013-10-29 Thread Matei Zaharia
I’m curious, Viren, do you have a patch you could post to build this against 
YARN 2.1 / 2.2? It would be nice to see how big the changes are.

Matei

On Sep 30, 2013, at 10:14 AM, viren kumar  wrote:

> I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by 
> following some of the changes described here: 
> http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/
> 
> That should help you build most of it. One change not covered there is the 
> change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to 
> ConverterUtils.convertFromYarn(containerToken, cmAddress).
> 
> Not 100% sure that my changes are correct. 
> 
> Hope that helps,
> Viren
> 
> 
> On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia  
> wrote:
> Hi Terence,
> 
> YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest 
> sticking with 2.0.x for now. We may create a different branch for this 
> version. Unfortunately due to the API change it may not be possible to 
> support this version while also supporting other widely-used versions like 
> 0.23.x.
> 
> Matei
> 
> On Sep 29, 2013, at 11:00 AM, Terance Dias  wrote:
> 
> >
> > Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta.
> > I have changed the following properties in SparkBuild.scala file.
> >
> > val DEFAULT_HADOOP_VERSION = "2.1.0-beta"
> > val DEFAULT_YARN = true
> >
> > when i do sbt clean compile, I get an error saying
> >
> > [error] 
> > /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42:
> >  not found: type AMRMProtocol
> > [error]   private var resourceManager: AMRMProtocol = null
> >
> > Thanks,
> > Terance.
> >
> 
> 



Re: met a problem while running a streaming example program

2013-10-29 Thread Patrick Wendell
If you just add the "extends Serializable" changes from here it should work.

On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell  wrote:
> This was fixed on 0.8 branch and master:
> https://github.com/apache/incubator-spark/pull/63/files
>
> - Patrick
>
> On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges
>  wrote:
>> I vaguely remember running into this same error. It says there
>> "java.io.NotSerializableException:
>> org.apache.spark.streaming.examples.clickstream.PageView"... can you
>> check the PageView class in the examples and make sure it has the
>> @serializable directive? I seem to remember having to add it.
>>
>> good luck,
>> Thunder
>>
>>
>> On Tue, Oct 29, 2013 at 6:54 AM, dachuan  wrote:
>>> Hi,
>>>
>>> I have tried the clickstream example, it runs into an exception, anybody met
>>> this before?
>>>
>>> Since the program mentioned "local[2]", so I run it in my local machine.
>>>
>>> thanks in advance,
>>> dachuan.
>>>
>>> Log Snippet 1:
>>>
>>> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks
>>> from Stage 12 (MapPartitionsRDD[63] at combineByKey at
>>> ShuffledDStream.scala:41)
>>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230
>>> bytes
>>> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
>>> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
>>> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
>>> org.apache.spark.rdd.BlockRDDPartition@0
>>> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed
>>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
>>> java.io.NotSerializableException
>>> java.io.NotSerializableException:
>>> org.apache.spark.streaming.examples.clickstream.PageView
>>>
>>> Log Snippet 2:
>>> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4
>>> times; aborting job java.io.NotSerializableException:
>>> org.apache.spark.streaming.examples.clickstream.PageView
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>
>>> Two commands that run this app:
>>> ./run-example
>>> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10
>>> ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream
>>> errorRatePerZipCode localhost 4
>>>


Re: met a problem while running a streaming example program

2013-10-29 Thread Patrick Wendell
This was fixed on 0.8 branch and master:
https://github.com/apache/incubator-spark/pull/63/files

- Patrick

On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges
 wrote:
> I vaguely remember running into this same error. It says there
> "java.io.NotSerializableException:
> org.apache.spark.streaming.examples.clickstream.PageView"... can you
> check the PageView class in the examples and make sure it has the
> @serializable directive? I seem to remember having to add it.
>
> good luck,
> Thunder
>
>
> On Tue, Oct 29, 2013 at 6:54 AM, dachuan  wrote:
>> Hi,
>>
>> I have tried the clickstream example, it runs into an exception, anybody met
>> this before?
>>
>> Since the program mentioned "local[2]", so I run it in my local machine.
>>
>> thanks in advance,
>> dachuan.
>>
>> Log Snippet 1:
>>
>> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks
>> from Stage 12 (MapPartitionsRDD[63] at combineByKey at
>> ShuffledDStream.scala:41)
>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230
>> bytes
>> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
>> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
>> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
>> org.apache.spark.rdd.BlockRDDPartition@0
>> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed
>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
>> java.io.NotSerializableException
>> java.io.NotSerializableException:
>> org.apache.spark.streaming.examples.clickstream.PageView
>>
>> Log Snippet 2:
>> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4
>> times; aborting job java.io.NotSerializableException:
>> org.apache.spark.streaming.examples.clickstream.PageView
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>> Two commands that run this app:
>> ./run-example
>> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10
>> ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream
>> errorRatePerZipCode localhost 4
>>


Re: met a problem while running a streaming example program

2013-10-29 Thread Thunder Stumpges
I vaguely remember running into this same error. It says there
"java.io.NotSerializableException:
org.apache.spark.streaming.examples.clickstream.PageView"... can you
check the PageView class in the examples and make sure it has the
@serializable directive? I seem to remember having to add it.

good luck,
Thunder


On Tue, Oct 29, 2013 at 6:54 AM, dachuan  wrote:
> Hi,
>
> I have tried the clickstream example, it runs into an exception, anybody met
> this before?
>
> Since the program mentioned "local[2]", so I run it in my local machine.
>
> thanks in advance,
> dachuan.
>
> Log Snippet 1:
>
> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks
> from Stage 12 (MapPartitionsRDD[63] at combineByKey at
> ShuffledDStream.scala:41)
> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230
> bytes
> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
> org.apache.spark.rdd.BlockRDDPartition@0
> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed
> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
> java.io.NotSerializableException
> java.io.NotSerializableException:
> org.apache.spark.streaming.examples.clickstream.PageView
>
> Log Snippet 2:
> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4
> times; aborting job java.io.NotSerializableException:
> org.apache.spark.streaming.examples.clickstream.PageView
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>
> Two commands that run this app:
> ./run-example
> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10
> ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream
> errorRatePerZipCode localhost 4
>


Re: oome from blockmanager

2013-10-29 Thread Aaron Davidson
Great! Glad to hear it worked out. Spark definitely has a pain point about
deciding the right number of partitions, and I think we're going to be
spending a lot of time trying to reduce that issue.

Currently working on the patch to reduce the shuffle file block overheads,
but in the meantime, you can set "spark.shuffle.consolidateFiles=false" to
exchange OOMEs due to too many partitions for worse performance (probably
an acceptable tradeoff).



On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman <
stephen.haber...@gmail.com> wrote:

> Hey guys,
>
> As a follow up, I raised our target partition size to 600mb (up from
> 64mb), which split this report's 500gb of tiny S3 files into ~700
> partitions, and everything ran much smoother.
>
> In retrospect, this was the same issue we'd ran into before, having too
> many partitions, and had previously solved by throwing some guesses at
> coalesce to make it magically go away.
>
> But now I feel like we have a much better understanding of why the
> numbers need to be what they are, which is great.
>
> So, thanks for all the input and helping me understand what's going on.
>
> It'd be great to see some of the optimizations to BlockManager happen,
> but I understand in the end why it needs to track what it does. And I
> was also admittedly using a small cluster anyway.
>
> - Stephen
>
>


Re: compare/contrast Spark with Cascading

2013-10-29 Thread Koert Kuipers
Hey Prashant,
I assume you mean steps to reproduce the OOM. I do not currently. I just
ran into them when porting some jobs from map-red. I never turned it into a
reproducible test, and i do not exclude that it was my poor programming
that caused it. However it happened with a bunch of jobs, and then i asked
on the message boards about the OOM, and people pointed me to the
assumption about reducer input having to fit in memory. At that point i
felt like that was too much of a limitation for the jobs i was trying to
port and i gave up.


On Tue, Oct 29, 2013 at 1:12 AM, Prashant Sharma wrote:

> Hey Koert,
>
> Can you give me steps to reproduce this ?
>
>
> On Tue, Oct 29, 2013 at 10:06 AM, Koert Kuipers  wrote:
>
>> Matei,
>> We have some jobs where even the input for a single key in a groupBy
>> would not fit in the the tasks memory. We rely on mapred to stream from
>> disk to disk as it reduces.
>> I think spark should be able to handle that situation to truly be able to
>> claim it can replace map-red (or not?).
>> Best, Koert
>>
>>
>> On Mon, Oct 28, 2013 at 8:51 PM, Matei Zaharia 
>> wrote:
>>
>>> FWIW, the only thing that Spark expects to fit in memory if you use
>>> DISK_ONLY caching is the input to each reduce task. Those currently don't
>>> spill to disk. The solution if datasets are large is to add more reduce
>>> tasks, whereas Hadoop would run along with a small number of tasks that do
>>> lots of disk IO. But this is something we will likely change soon. Other
>>> than that, everything runs in a streaming fashion and there's no need for
>>> the data to fit in memory. Our goal is certainly to work on any size
>>> datasets, and some of our current users are explicitly using Spark to
>>> replace things like Hadoop Streaming in just batch jobs (see e.g. Yahoo!'s
>>> presentation from http://ampcamp.berkeley.edu/3/). If you run into
>>> trouble with these, let us know, since it is an explicit goal of the
>>> project to support it.
>>>
>>> Matei
>>>
>>> On Oct 28, 2013, at 5:32 PM, Koert Kuipers  wrote:
>>>
>>> no problem :) i am actually not familiar with what oscar has said on
>>> this. can you share or point me to the conversation thread?
>>>
>>> it is my opinion based on the little experimenting i have done. but i am
>>> willing to be convinced otherwise.
>>> one the very first things i did when we started using spark is run jobs
>>> with DISK_ONLY, and see if it could some of the jobs that map-reduce does
>>> for us. however i ran into OOMs, presumably because spark makes assumptions
>>> that some things should fit in memory. i have to admit i didn't try too
>>> hard after the first OOMs.
>>>
>>> if spark were able to scale from the quick in-memory query to the
>>> overnight disk-only giant batch query, i would love it! spark has a much
>>> nicer api than map-red, and one could use a single set of algos for
>>> everything from quick/realtime queries to giant batch jobs. as far as i am
>>> concerned map-red would be done. our clusters of the future would be hdfs +
>>> spark.
>>>
>>>
>>> On Mon, Oct 28, 2013 at 8:16 PM, Mark Hamstra 
>>> wrote:
>>>
 And I didn't mean to skip over you, Koert.  I'm just more familiar with
 what Oscar said on the subject than with your opinion.



 On Mon, Oct 28, 2013 at 5:13 PM, Mark Hamstra 
 wrote:

> Hmmm... I was unaware of this concept that Spark is for medium to
>> large datasets but not for very large datasets.
>
>
> It is in the opinion of some at Twitter.  That doesn't make it true or
> a universally held opinion.
>
>
>
> On Mon, Oct 28, 2013 at 5:08 PM, Ashish Rangole wrote:
>
>> Hmmm... I was unaware of this concept that Spark is for medium to
>> large datasets but not for very large datasets. What size is very large?
>>
>> Can someone please elaborate on why this would be the case and what
>> stops Spark, as it is today, to be successfully run on very large 
>> datasets?
>> I'll appreciate it.
>>
>> I would think that Spark should be able to pull off Hadoop level
>> throughput in worst case with DISK_ONLY caching.
>>
>> Thanks
>> On Oct 28, 2013 1:37 PM, "Koert Kuipers"  wrote:
>>
>>> i would say scaling (cascading + DSL for scala) offers similar
>>> functionality to spark, and a similar syntax.
>>> the main difference between spark and scalding is target jobs:
>>> scalding is for long running jobs on very large data. the data is
>>> read from and written to disk between steps. jobs run from minutes to 
>>> days.
>>> spark is for faster jobs on medium to large data. the data is
>>> primarily held in memory. jobs run from a few seconds to a few hours.
>>> although spark can work with data on disks it still makes assumptions 
>>> that
>>> data needs to fit in memory for certain steps (although less and less 
>>> with
>>> every release). spark also makes iterative designs mu

met a problem while running a streaming example program

2013-10-29 Thread dachuan
Hi,

I have tried the clickstream example, it runs into an exception, anybody
met this before?

Since the program mentioned "local[2]", so I run it in my local machine.

thanks in advance,
dachuan.

Log Snippet 1:

13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks
from Stage 12 (MapPartitionsRDD[63] at combineByKey at
ShuffledDStream.scala:41)
13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230
bytes
13/10/29 08:50:25 INFO local.LocalScheduler: Running 75
13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0
13/10/29 08:50:25 INFO spark.CacheManager: Computing partition
org.apache.spark.rdd.BlockRDDPartition@0
13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed
13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to
java.io.NotSerializableException
java.io.NotSerializableException:
org.apache.spark.streaming.examples.clickstream.PageView

Log Snippet 2:
org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4
times; aborting job java.io.NotSerializableException:
org.apache.spark.streaming.examples.clickstream.PageView
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Two commands that run this app:
./run-example
org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10
./run-example
org.apache.spark.streaming.examples.clickstream.PageViewStream
errorRatePerZipCode localhost 4


Re: Task output before a shuffle

2013-10-29 Thread Ufuk Celebi
On 29 Oct 2013, at 02:47, Matei Zaharia  wrote:
> Yes, we still write out data after these tasks in Spark 0.8, and it needs to 
> be written out before any stage that reads it can start. The main reason is 
> simplicity when there are faults, as well as more flexible scheduling (you 
> don't have to decide where each reduce task is in advance, you can have more 
> reduce tasks than you have CPU cores, etc).

Thank you for the answer! I have a follow-up:

In which fraction (RDD or non-RDD) of the heap will the output be stored before 
spilling to disk?

I have a job where I read over all large data set once and don't persist 
anything. Would it make sense to set "spark.storage.memoryFraction" to a 
smaller value in order to avoid spilling to disk?

- Ufuk

Reading corrupted hadoop sequence files

2013-10-29 Thread Arun Kumar
Hi,

I am trying to read some hadoop sequence files. Some of the lines are not
being parsed and causing exceptions. This throws global exception and the
job dies and files could not be loaded. Is there a way to catch the
exception at each object and return null if the corresponding object could
not be formed. I have seen some solution for hadoop map-reduce framework
like this(
http://stackoverflow.com/questions/14920236/how-to-prevent-hadoop-job-to-fail-on-corrupted-input-file).
How to do it in spark?

Thanks