Mmm how many days worth of data/how deep is your data nesting?

I suspect your running into a current issue with parquet (a fix is in
master but I don't believe released yet..). It reads all the metadata to
the submitter node as part of scheduling the job. This can cause long start
times(timeouts too), and also requires a lot of memory so hence the OOM
with lower memory. The newer one reads the metadata per file on the task
reading that file. At least the hadoop stack is designed to do that on the
mappers. With how Spark works I expect the same improvement there.



On Mon, Sep 8, 2014 at 3:33 PM, Manu Mukerji <manu...@gmail.com> wrote:

> How big is the data set? Does it work when you copy it to hdfs?
>
> -Manu
>
>
> On Mon, Sep 8, 2014 at 2:58 PM, Jim Carroll <jimfcarr...@gmail.com> wrote:
>
>> Hello all,
>>
>> I've been wrestling with this problem all day and any suggestions would be
>> greatly appreciated.
>>
>> I'm trying to test reading a parquet file that's stored in s3 using a
>> spark
>> cluster deployed on ec2. The following works in the spark shell when run
>> completely locally on my own machine (i.e. no --master option passed to
>> the
>> spark-shell command):
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> import sqlContext._
>> val p = parquetFile("s3n://[bucket]/path-to-parquet-dir/")
>> p.registerAsTable("s")
>> sql("select count(*) from s").collect
>>
>> I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4)
>> using
>> the standalone cluster manager and deployed with the spark-ec2 script.
>>
>> Running the same code in a spark shell connected to the cluster it
>> basically
>> hangs on the select statement. The workers/slaves simply time out and
>> restart every 30 seconds when they hit what appears to be an activity
>> timeout, as if there's no activity from the spark-shell (based on what I
>> see
>> in the stderr logs for the job, I assume this is expected behavior when
>> connected from a spark-shell that's sitting idle).
>>
>> I see these messages about every 30 seconds:
>>
>> 14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any
>> resources; check your cluster UI to ensure that workers are registered and
>> have sufficient memory
>> 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated:
>> app-20140908213842-0002/7 is now EXITED (Command exited with code 1)
>> 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor
>> app-20140908213842-0002/7 removed: Command exited with code 1
>> 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added:
>> app-20140908213842-0002/8 on
>> worker-20140908183422-ip-10-60-107-194.ec2.internal-53445
>> (ip-10-60-107-194.ec2.internal:53445) with 2 cores
>> 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445
>> with 2 cores, 4.0 GB RAM
>> 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated:
>> app-20140908213842-0002/8 is now RUNNING
>>
>> Eventually it fails with a:
>>
>> 14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated:
>> app-20140908213842-0002/9 is now EXITED (Command exited with code 1)
>> 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor
>> app-20140908213842-0002/9 removed: Command exited with code 1
>> 14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been
>> killed. Reason: Master removed our application: FAILED
>> 14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
>> have all completed, from pool
>> 14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1
>> 14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at
>> SparkPlan.scala:85
>> 14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at
>> http://192.168.10.198:4040
>> 14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler
>> 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all
>> executors
>> 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor
>> to
>> shut down
>> 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor
>> to
>> shut down
>> org.apache.spark.SparkException: Job aborted due to stage failure: Master
>> removed our application: FAILED
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>>         at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>>         at scala.Option.foreach(Option.scala:236)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>>         at
>>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>         at
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> As far as the "Initial job has not accepted any resources" I'm running the
>> spark-shell command with:
>>
>> SPARK_MEM=2g ./spark-shell --master
>> spark://ec2-x-x-x-x.compute-1.amazonaws.com:7077
>>
>> According to the master web page each node has 6 Gig so I'm not sure why
>> I'm
>> seeing that message either. If I run with less than 2g I get the following
>> in my spark-shell:
>>
>> 14/09/08 17:47:38 INFO Remoting: Remoting shut down
>> 14/09/08 17:47:38 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
>> shut down.
>> java.io.IOException: Error reading summaries
>>         at
>>
>> parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:128)
>>         ....
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>
>> I'm not sure if this exception is from the spark-shell jvm or transferred
>> over from the master or a worker through the master.
>>
>> Any help would be greatly appreciated.
>>
>> Thanks
>> Jim
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to