Problem with changing the akka.framesize parameter

2015-02-04 Thread sahanbull
I am trying to run a spark application with 

-Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000
-Dspark.akka.frameSize=1 

and the job fails because one or more of the akka frames are larger than
1mb (12000 ish). 

When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and
RUN:

./spark/bin/spark-submit  --driver-memory 30g --executor-memory 30g
mySparkCode.py

I get an error in the startup as :


ERROR OneForOneStrategy: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and ha
s constructor with [akka.actor.ExtendedActorSystem] and
[com.typesafe.config.Config] parameters
java.lang.IllegalArgumentException: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and has const
ructor with [akka.actor.ExtendedActorSystem] and
[com.typesafe.config.Config] parameters
at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)
at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
at
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
'maximum-frame-size' must be at least 32000 bytes
at scala.Predef$.require(Predef.scala:233)
at
akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
at
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
at org.apache.spark.SparkContext.init(SparkContext.scala:203)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Can anyone give me a clue about whats going wrong here??

I am running spark 1.1.0 in r3.2xlarge EC2 

Re: Spark Job running on localhost on yarn cluster

2015-02-04 Thread Michael Albert
1) Parameters like --num-executors should come before the jar.  That is, you 
want something like$SPARK_HOME --num-executors 3 --driver-memory 6g 
--executor-memory 7g \--master yarn-cluster  --class EDDApp 
target/scala-2.10/eddjar \outputPath
That is, *your* parameters come after the jar, spark's parameters come *before* 
the jar.That's how spark knows which are which (at least that is my 
understanding).
2‚ Double check that in your code, when you create the SparkContext or the 
configuration object, that you don't set local there.(I don't recall the exact 
order of priority if the parameters disagree with the code).
Good luck!
-Mike

  From: kundan kumar iitr.kun...@gmail.com
 To: spark users user@spark.apache.org 
 Sent: Wednesday, February 4, 2015 7:41 AM
 Subject: Spark Job running on localhost on yarn cluster
   
Hi, 
I am trying to execute my code on a yarn cluster
The command which I am using is 
$SPARK_HOME/bin/spark-submit --class EDDApp 
target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster 
--num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath

But, I can see that this program is running only on the localhost.
Its able to read the file from hdfs.
I have tried this in standalone mode and it works fine.
Please suggest where is it going wrong.

Regards,Kundan

  

Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Imran Rashid
Hi Michael,

judging from the logs, it seems that those tasks are just working a really
long time.  If you have long running tasks, then you wouldn't expect the
driver to output anything while those tasks are working.

What is unusual is that there is no activity during all that time the tasks
are executing.  Are you sure you are looking at the activity of the
executors (the nodes that are actually running the tasks), and not the
activity of the driver node (the node where your main program lives, but
that doesn't do any of the distributed computation)?  It would be perfectly
normal for the driver node to be idle while all the executors were busy
with long running tasks.

I would look at:
(a) the cpu usage etc. of the executor nodes during those long running tasks
(b) the thread dumps of the executors during those long running tasks
(available via the UI under the Executors tab, or just log into the boxes
and run jstack).  Ideally this will point out a hotspot in your code that
is making these tasks take so long.  (Or perhaps it'll point out what is
going on in spark internals that is so slow)
(c) the summary metrics for the long running stage, when it finally
finishes (also available in the UI, under the Stages tab).  You will get
a breakdown of how much time is spent in various phases of the tasks, how
much data is read, etc., which can help you figure out why tasks are slow


Hopefully this will help you find out what is taking so long.  If you find
out the executors really arent' doing anything during these really long
tasks, it would be great to find that out, and maybe get some more info for
a bug report.

Imran


On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 First, my sincere thanks to all who have given me advice.
 Following previous discussion, I've rearranged my code to try to keep the
 partitions to more manageable sizes.
 Thanks to all who commented.

 At the moment, the input set I'm trying to work with is about 90GB (avro
 parquet format).

 When I run on a reasonable chunk of the data (say half) things work
 reasonably.

 On the full data, the spark process stalls.
 That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.
 No cpu usage, no error message, no network activity.
 It just seems to sits there.
 The messages bracketing the stall are shown below.

 Any advice on how to diagnose this?
 I don't get any error messages.
 The spark UI says that it is running a stage, but it makes no discernible
 progress.
 Ganglia shows no CPU usage or network activity.
 When I shell into the worker nodes there are no filled disks or other
 obvious problems.

 How can I discern what Spark is waiting for?

 The only weird thing seen, other than the stall, is that the yarn logs on
 the workers have lines with messages like this:
 2015-02-03 22:59:58,890 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id
 container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory
 used; 7.6 GB of 42.5 GB virtual memory used

 It's rather strange that it mentions 42.5 GB of virtual memory.  The
 machines are EMR machines with 32 GB of physical memory and, as far as I
 can determine, no swap space.

 The messages bracketing the stall are shown below.


 Any advice is welcome.

 Thanks!

 Sincerely,
  Mike Albert

 Before the stall.
 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet
 5.0, whose tasks have all completed, from pool
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5
 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable
 stages
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage
 7, Stage 8)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 6: List(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 7: List(Stage 6)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 8: List(Stage 7)
 At this point, I see no activity for 1.5 hours except for this (XXX for
 I.P. address)
 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor:
 akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor

 Then finally it started again:
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in
 stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
 stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4)
 15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at
 Transposer.scala:211) finished in 7209.534 s






Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread bo yang
Hi Corey,

I see. Thanks for making it clear. I may be lucky not hitting code path of
such Guava classes. But I hit some other jar conflicts when using Spark to
connect to AWS S3. Then I had to manually try each version of
org.apache.httpcomponents until I found a proper old version.

Another suggestion is to build Spark by yourself.

Anyway, would like to see your update once you figure out the solution.

Best wishes!
Bo



On Wed, Feb 4, 2015 at 4:47 AM, Corey Nolet cjno...@gmail.com wrote:

 Bo yang-

 I am using Spark 1.2.0 and undoubtedly there are older Guava classes which
 are being picked up and serialized with the closures when they are sent
 from the driver to the executors because the class serial version ids don't
 match from the driver to the executors. Have you tried doing this? Guava
 works fine for me when this is not the case- but as soon as a Guava class
 which was changed from versions 15.0 is serialized, it fails. See [1] fore
 info- we did fairly extensive testing last night. I've isolated the issue
 to Hadoop's really old version of Guava being picked up. Again, this is
 only noticeable when classes are used from Guava 15.0 that were changed
 from previous versions and those classes are being serialized on the driver
 and shipped to the executors.


 [1] https://github.com/calrissian/mango/issues/158

 On Wed, Feb 4, 2015 at 1:31 AM, bo yang bobyan...@gmail.com wrote:

 Corey,

 Which version of Spark do you use? I am using Spark 1.2.0, and  guava
 15.0. It seems fine.

 Best,
 Bo


 On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid
 wrote:

  Try spark.yarn.user.classpath.first (see
 https://issues.apache.org/jira/browse/SPARK-2996 - only works for
 YARN). Also thread at
 http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html
 .

 HTH,
 Markus

 On 02/03/2015 11:20 PM, Corey Nolet wrote:

 I'm having a really bad dependency conflict right now with Guava
 versions between my Spark application in Yarn and (I believe) Hadoop's
 version.

  The problem is, my driver has the version of Guava which my
 application is expecting (15.0) while it appears the Spark executors that
 are working on my RDDs have a much older version (assuming it's the old
 version on the Hadoop classpath).

  Is there a property like mapreduce.job.user.classpath.first' that I
 can set to make sure my own classpath is extablished first on the executors?







Re: 2GB limit for partitions?

2015-02-04 Thread Mridul Muralidharan
That work is from more than an year back and is not maintained anymore
since we do not use it inhouse now.
Also note that there have been quite a lot of changes in spark ...
Including some which break assumptions made in the patch, so it's value is
very low - having said that, do feel free to work on the jira and/or use
the patch if it helps !

Regards
Mridul

On Wednesday, February 4, 2015, Imran Rashid iras...@cloudera.com wrote:

 Hi Mridul,


 do you think you'll keep working on this, or should this get picked up by
 others?  Looks like there was a lot of work put into LargeByteBuffer, seems
 promising.

 thanks,
 Imran

 On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com
 javascript:_e(%7B%7D,'cvml','mri...@gmail.com'); wrote:

 That is fairly out of date (we used to run some of our jobs on it ... But
 that is forked off 1.1 actually).

 Regards
 Mridul


 On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com
 javascript:_e(%7B%7D,'cvml','iras...@cloudera.com'); wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although
 it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide
 the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a
 trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let
 you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job
 aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at
 scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com
  *To:* Michael Albert m_albert...@yahoo.com
  *Cc:* user@spark.apache.org user@spark.apache.org
  *Sent:* Monday, February 2, 2015 10:13 PM
  

Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Sandy Ryza
Also, do you see any lines in the YARN NodeManager logs where it says that
it's killing a container?

-Sandy

On Wed, Feb 4, 2015 at 8:56 AM, Imran Rashid iras...@cloudera.com wrote:

 Hi Michael,

 judging from the logs, it seems that those tasks are just working a really
 long time.  If you have long running tasks, then you wouldn't expect the
 driver to output anything while those tasks are working.

 What is unusual is that there is no activity during all that time the
 tasks are executing.  Are you sure you are looking at the activity of the
 executors (the nodes that are actually running the tasks), and not the
 activity of the driver node (the node where your main program lives, but
 that doesn't do any of the distributed computation)?  It would be perfectly
 normal for the driver node to be idle while all the executors were busy
 with long running tasks.

 I would look at:
 (a) the cpu usage etc. of the executor nodes during those long running
 tasks
 (b) the thread dumps of the executors during those long running tasks
 (available via the UI under the Executors tab, or just log into the boxes
 and run jstack).  Ideally this will point out a hotspot in your code that
 is making these tasks take so long.  (Or perhaps it'll point out what is
 going on in spark internals that is so slow)
 (c) the summary metrics for the long running stage, when it finally
 finishes (also available in the UI, under the Stages tab).  You will get
 a breakdown of how much time is spent in various phases of the tasks, how
 much data is read, etc., which can help you figure out why tasks are slow


 Hopefully this will help you find out what is taking so long.  If you find
 out the executors really arent' doing anything during these really long
 tasks, it would be great to find that out, and maybe get some more info for
 a bug report.

 Imran


 On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 First, my sincere thanks to all who have given me advice.
 Following previous discussion, I've rearranged my code to try to keep the
 partitions to more manageable sizes.
 Thanks to all who commented.

 At the moment, the input set I'm trying to work with is about 90GB (avro
 parquet format).

 When I run on a reasonable chunk of the data (say half) things work
 reasonably.

 On the full data, the spark process stalls.
 That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.
 No cpu usage, no error message, no network activity.
 It just seems to sits there.
 The messages bracketing the stall are shown below.

 Any advice on how to diagnose this?
 I don't get any error messages.
 The spark UI says that it is running a stage, but it makes no discernible
 progress.
 Ganglia shows no CPU usage or network activity.
 When I shell into the worker nodes there are no filled disks or other
 obvious problems.

 How can I discern what Spark is waiting for?

 The only weird thing seen, other than the stall, is that the yarn logs on
 the workers have lines with messages like this:
 2015-02-03 22:59:58,890 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id
 container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory
 used; 7.6 GB of 42.5 GB virtual memory used

 It's rather strange that it mentions 42.5 GB of virtual memory.  The
 machines are EMR machines with 32 GB of physical memory and, as far as I
 can determine, no swap space.

 The messages bracketing the stall are shown below.


 Any advice is welcome.

 Thanks!

 Sincerely,
  Mike Albert

 Before the stall.
 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed
 TaskSet 5.0, whose tasks have all completed, from pool
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5
 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable
 stages
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6,
 Stage 7, Stage 8)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 6: List(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 7: List(Stage 6)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 8: List(Stage 7)
 At this point, I see no activity for 1.5 hours except for this (XXX for
 I.P. address)
 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor:
 akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor

 Then finally it started again:
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in
 stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
 

Re: How many stages in my application?

2015-02-04 Thread Akhil Das
You can easily understand the flow by looking at the number of operations
in your program (like map, groupBy, join etc.), first of all you list out
the number of operations happening in your application and then from the
webui you will be able to see how many operations have happened so far.

Thanks
Best Regards

On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote:

 I'm sitting here looking at my application crunching gigabytes of data on
 a cluster and I have no idea if it's an hour away from completion or a
 minute. The web UI shows progress through each stage, but not how many
 stages remaining. How can I work out how many stages my program will take
 automatically?

 My application has a slightly interesting DAG (re-use of functions that
 contain Spark transformations, persistent RDDs). Not that complex, but not
 'step 1, step 2, step 3'.

 I'm guessing that if the driver program runs sequentially sending messages
 to Spark, then Spark has no knowledge of the structure of the driver
 program. Therefore it's necessary to execute it on a small test dataset and
 see how many stages result?

 When I set spark.eventLog.enabled = true and run on (very small) test data
 I don't get any stage messages in my STDOUT or in the log file. This is on
 a `local` instance.

 Did I miss something obvious?

 Thanks!

 Joe



Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Kelvin Chu
Joe, I also use S3 and gzip. So far the I/O is not a problem. In my case,
the operation is SQLContext.JsonFile() and I can see from Ganglia that the
whole cluster is CPU bound (99% saturated). I have 160 cores and I can see
the network can sustain about 150MBit/s.

Kelvin

On Wed, Feb 4, 2015 at 10:20 AM, Aaron Davidson ilike...@gmail.com wrote:

 The latter would be faster. With S3, you want to maximize number of
 concurrent readers until you hit your network throughput limits.

 On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com
 wrote:

  Hi if i have a 10GB file on s3 and set 10 partitions, would it be
 download whole file on master first and broadcast it or each worker would
 just read it's range from the file?

 Thanks,
 Peter

 On 2015-02-03 23:30, Sven Krasser wrote:

  Hey Joe,

 With the ephemeral HDFS, you get the instance store of your worker nodes.
 For m3.xlarge that will be two 40 GB SSDs local to each instance, which are
 very fast.

  For the persistent HDFS, you get whatever EBS volumes the launch script
 configured. EBS volumes are always network drives, so the usual limitations
 apply. To optimize throughput, you can use EBS volumes with provisioned
 IOPS and you can use EBS optimized instances. I don't have hard numbers at
 hand, but I'd expect this to be noticeably slower than using local SSDs.

 As far as only using S3 goes, it depends on your use case (i.e. what you
 plan on doing with the data while it is there). If you store it there in
 between running different applications, you can likely work around
 consistency issues.

 Also, if you use Amazon's EMRFS to access data in S3, you can use their
 new consistency feature (
 https://aws.amazon.com/blogs/aws/emr-consistent-file-system/).

 Hope this helps!
 -Sven


 On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

  Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

  [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



  -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 http://sites.google.com/site/krasser/?utm_source=sig






Re: How many stages in my application?

2015-02-04 Thread Mark Hamstra
But there isn't a 1-1 mapping from operations to stages since multiple
operations will be pipelined into a single stage if no shuffle is
required.  To determine the number of stages in a job you really need to be
looking for shuffle boundaries.

On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can easily understand the flow by looking at the number of operations
 in your program (like map, groupBy, join etc.), first of all you list out
 the number of operations happening in your application and then from the
 webui you will be able to see how many operations have happened so far.

 Thanks
 Best Regards

 On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote:

 I'm sitting here looking at my application crunching gigabytes of data on
 a cluster and I have no idea if it's an hour away from completion or a
 minute. The web UI shows progress through each stage, but not how many
 stages remaining. How can I work out how many stages my program will take
 automatically?

 My application has a slightly interesting DAG (re-use of functions that
 contain Spark transformations, persistent RDDs). Not that complex, but not
 'step 1, step 2, step 3'.

 I'm guessing that if the driver program runs sequentially sending
 messages to Spark, then Spark has no knowledge of the structure of the
 driver program. Therefore it's necessary to execute it on a small test
 dataset and see how many stages result?

 When I set spark.eventLog.enabled = true and run on (very small) test
 data I don't get any stage messages in my STDOUT or in the log file. This
 is on a `local` instance.

 Did I miss something obvious?

 Thanks!

 Joe





Re: Spark streaming app shutting down

2015-02-04 Thread Akhil Das
AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault
tolerance, which means it can handle the receiver/driver failures. You can
also look at the lowlevel kafka consumer
https://github.com/dibbhatt/kafka-spark-consumer which has a better fault
tolerance mechanism for receiver failures. This low level consumer will
push the offset of the message being read into zookeeper for fault
tolerance. In your case i think mostly the inflight data would be lost if
you arent using any of the fault tolerance mechanism.

Thanks
Best Regards

On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hello Sprakans,

 I'm running a spark streaming app which reads data from kafka topic does
 some processing and then persists the results in HBase.

 I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8
 cores each). I've enable checkpointing  I am also  rate limiting my
 kafkaReceivers so that the number of items read is not more than 10 records
 per sec.
 The kafkaReceiver I'm using is *not* ReliableKafkaReceiver.

 This app was running fine for ~3 days then there was an increased load on
 the HBase server because of some other process querying HBase tables.
 This led to increase in the batch processing time of the spark batches
 (processed 1 min batch in 10 min) which previously was finishing in 20 sec
 which in turn led to the shutdown of the spark application, PFA the
 executor logs.

 From the logs I'm getting below exceptions *[1]*  *[2]* looks like there
 was some outstanding Jobs that didn't get processed or the Job couldn't
 find the input data. From the logs it looks seems that the shutdown hook
 gets invoked but it cannot process the in-flight block.

 I have a couple of queries on this
   1) Does this mean that these jobs failed and the *in-flight data *is
 lost?
   2) Does the Spark job *buffers kafka* input data while the Job is under
 processing state for 10 mins and on shutdown is that too lost? (I do not
 see any OOM error in the logs).
   3) Can we have *explicit commits* enabled in the kafkaReceiver so that
 the offsets gets committed only when the RDD(s) get successfully processed?

 Also I'd like to know if there is a *graceful way to shutdown a spark app
 running on yarn*. Currently I'm killing the yarn app to stop it which
 leads to loss of that job's history wheras in this case the application
 stops and succeeds and thus preserves the logs  history.

 *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have
 1 requests outstanding when connection from
 hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed
 *[2]* java.lang.Exception: Could not compute split, block
 input-2-1422901498800 not found
 *[3]* 
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode
 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63
 does not have any open files.

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Michael Albert
Greetings!
Thanks to all who have taken the time to look at this.
While the process is stalled, I see, in the yarn log on the head node, 
repeating messages of the form Trying to fulfill reservation for application 
XXX on node YYY, but that node is is reserved by XXX_01.  Below is a chunk 
of the log.
Any suggestions as to what I can investigate?
Thanks!-Mike
2015-02-04 18:18:28,617 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Trying to fulfill reservation for 
application application_1422834185427_0088 on node: 
ip-10-171-0-124.ec2.internal:91032015-02-04 18:18:28,617 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): assignContainers: 
node=ip-10-171-0-124.ec2.internal application=88 priority=1 request={Priority: 
1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: 
*, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,617 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
 (ResourceManager Event Processor): Application application_1422834185427_0088 
reserved container container_1422834185427_0088_01_25 on node host: 
ip-10-171-0-124.ec2.internal:9103 #containers=2 available=5632 used=17408, 
currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,618 
INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): Reserved container  
application=application_1422834185427_0088 resource=memory:8704, vCores:1 
queue=default: capacity=1.0, absoluteCapacity=1.0, 
usedResources=memory:226304, vCores:26usedCapacity=0.982, 
absoluteUsedCapacity=0.982, numApps=1, numContainers=26 
usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, 
vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,618 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Skipping scheduling since node 
ip-10-171-0-124.ec2.internal:9103 is reserved by application 
appattempt_1422834185427_0088_012015-02-04 18:18:28,623 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): default usedResources: memory:226304, 
vCores:26 clusterResources: memory:230400, vCores:160 currentCapacity 
0.982 required memory:8704, vCores:1 potentialNewCapacity: 1.02 (  
max-capacity: 1.0)2015-02-04 18:18:28,625 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Trying to fulfill reservation for 
application application_1422834185427_0088 on node: 
ip-10-171-0-119.ec2.internal:91032015-02-04 18:18:28,625 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): assignContainers: 
node=ip-10-171-0-119.ec2.internal application=88 priority=1 request={Priority: 
1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: 
*, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,626 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
 (ResourceManager Event Processor): Application application_1422834185427_0088 
reserved container container_1422834185427_0088_01_26 on node host: 
ip-10-171-0-119.ec2.internal:9103 #containers=2 available=5632 used=17408, 
currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,626 
INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): Reserved container  
application=application_1422834185427_0088 resource=memory:8704, vCores:1 
queue=default: capacity=1.0, absoluteCapacity=1.0, 
usedResources=memory:226304, vCores:26usedCapacity=0.982, 
absoluteUsedCapacity=0.982, numApps=1, numContainers=26 
usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, 
vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,626 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Skipping scheduling since node 
ip-10-171-0-119.ec2.internal:9103 is reserved by application 
appattempt_1422834185427_0088_012015-02-04 18:18:28,636 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Trying to fulfill reservation for 
application application_1422834185427_0088 on node: 
ip-10-171-0-129.ec2.internal:91032015-02-04 18:18:28,636 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): assignContainers: 
node=ip-10-171-0-129.ec2.internal application=88 priority=1 request={Priority: 
1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: 
*, Relax Locality: true} 

Re: Parquet compression codecs not applied

2015-02-04 Thread Ayoub
I was using hive context an not sql context, therefore (SET
spark.sql.parquet.compression.codec=gzip) was ignored.

Michael Armbrust pointed out that parquet.compression should be used
instead, witch solved the issue.

I am still wondering if this behavior is normal, it would be better if
spark.sql.parquet.compression.codec would be translated to
parquet.compression in case of hive context.
Other wise the documentation should be updated to be more precise.



2015-02-04 19:13 GMT+01:00 sahanbull sa...@skimlinks.com:

 Hi Ayoub,

 You could try using the sql format to set the compression type:

 sc = SparkContext()
 sqc = SQLContext(sc)
 sqc.sql(SET spark.sql.parquet.compression.codec=gzip)

 You get a notification on screen while running the spark job when you set
 the compression codec like this. I havent compared it with different
 compression methods, Please let the mailing list knows if this works for
 you.

 Best
 Sahan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Parquet-compression-codecs-not-applied-tp21499.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Parquet compression codecs not applied

2015-02-04 Thread sahanbull
Hi Ayoub,

You could try using the sql format to set the compression type:

sc = SparkContext()
sqc = SQLContext(sc)
sqc.sql(SET spark.sql.parquet.compression.codec=gzip)

You get a notification on screen while running the spark job when you set
the compression codec like this. I havent compared it with different
compression methods, Please let the mailing list knows if this works for
you. 

Best
Sahan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Marcelo Vanzin
Hi Corey,

When you run on Yarn, Yarn's libraries are placed in the classpath,
and they have precedence over your app's. So, with Spark 1.2, you'll
get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get
Guava 14 from Spark, so still a problem for you).

Right now, the option Markus mentioned
(spark.yarn.user.classpath.first) can be a workaround for you, since
it will place your app's jars before Yarn's on the classpath.


On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote:
 I'm having a really bad dependency conflict right now with Guava versions
 between my Spark application in Yarn and (I believe) Hadoop's version.

 The problem is, my driver has the version of Guava which my application is
 expecting (15.0) while it appears the Spark executors that are working on my
 RDDs have a much older version (assuming it's the old version on the Hadoop
 classpath).

 Is there a property like mapreduce.job.user.classpath.first' that I can set
 to make sure my own classpath is extablished first on the executors?



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Aaron Davidson
The latter would be faster. With S3, you want to maximize number of
concurrent readers until you hit your network throughput limits.

On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com
wrote:

  Hi if i have a 10GB file on s3 and set 10 partitions, would it be
 download whole file on master first and broadcast it or each worker would
 just read it's range from the file?

 Thanks,
 Peter

 On 2015-02-03 23:30, Sven Krasser wrote:

  Hey Joe,

 With the ephemeral HDFS, you get the instance store of your worker nodes.
 For m3.xlarge that will be two 40 GB SSDs local to each instance, which are
 very fast.

  For the persistent HDFS, you get whatever EBS volumes the launch script
 configured. EBS volumes are always network drives, so the usual limitations
 apply. To optimize throughput, you can use EBS volumes with provisioned
 IOPS and you can use EBS optimized instances. I don't have hard numbers at
 hand, but I'd expect this to be noticeably slower than using local SSDs.

 As far as only using S3 goes, it depends on your use case (i.e. what you
 plan on doing with the data while it is there). If you store it there in
 between running different applications, you can likely work around
 consistency issues.

 Also, if you use Amazon's EMRFS to access data in S3, you can use their
 new consistency feature (
 https://aws.amazon.com/blogs/aws/emr-consistent-file-system/).

 Hope this helps!
 -Sven


 On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

  Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

  [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



  -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 http://sites.google.com/site/krasser/?utm_source=sig





Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Koert Kuipers
the whole spark.files.userClassPathFirs never really worked for me in
standalone mode, since jars were added dynamically which means they had
different classloaders leading to a real classloader hell if you tried to
add a newer version of jar that spark already used. see:
https://issues.apache.org/jira/browse/SPARK-1863

do i understand it correctly that on yarn the the customer jars are truly
placed before the yarn and spark jars on classpath? meaning at container
construction time, on the same classloader? that would be great news for
me. it would open up the possibility of using newer versions of many
libraries.


On Wed, Feb 4, 2015 at 1:12 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Corey,

 When you run on Yarn, Yarn's libraries are placed in the classpath,
 and they have precedence over your app's. So, with Spark 1.2, you'll
 get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get
 Guava 14 from Spark, so still a problem for you).

 Right now, the option Markus mentioned
 (spark.yarn.user.classpath.first) can be a workaround for you, since
 it will place your app's jars before Yarn's on the classpath.


 On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote:
  I'm having a really bad dependency conflict right now with Guava versions
  between my Spark application in Yarn and (I believe) Hadoop's version.
 
  The problem is, my driver has the version of Guava which my application
 is
  expecting (15.0) while it appears the Spark executors that are working
 on my
  RDDs have a much older version (assuming it's the old version on the
 Hadoop
  classpath).
 
  Is there a property like mapreduce.job.user.classpath.first' that I can
 set
  to make sure my own classpath is extablished first on the executors?



 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Corey Nolet
My mistake Marcello, I was looking at the wrong message. That reply was
meant for bo yang.
On Feb 4, 2015 4:02 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Corey,

 On Wed, Feb 4, 2015 at 12:44 PM, Corey Nolet cjno...@gmail.com wrote:
  Another suggestion is to build Spark by yourself.
 
  I'm having trouble seeing what you mean here, Marcelo. Guava is already
  shaded to a different package for the 1.2.0 release. It shouldn't be
 causing
  conflicts.

 That wasn't my suggestion and I definitely do not recommend rebuilding
 Spark to work around these issues. :-)

 --
 Marcelo



Re: How to get Hive table schema using Spark SQL or otherwise

2015-02-04 Thread Michael Armbrust
sqlContext.table(tableName).schema()

On Wed, Feb 4, 2015 at 1:07 PM, Ayoub benali.ayoub.i...@gmail.com wrote:

 Given a hive context you could execute:
 hiveContext.sql(describe TABLE_NAME) you would get the name of the
 fields and their types

 2015-02-04 21:47 GMT+01:00 nitinkak001 [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21502i=0:

 I want to get a Hive table schema details into Spark. Specifically, I
 want to
 get column name and type information. Is it possible to do it e.g using
 JavaSchemaRDD or something else?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21502i=1
 For additional commands, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21502i=2



 --
 View this message in context: Re: How to get Hive table schema using
 Spark SQL or otherwise
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Spark 1.2
Data is read from parquet with 2 partitions and is cached as table with 2
partitions. Verified in UI that it shows RDD with 2 partitions  it is
fully cached in memory

Cached data contains column a, b, c. Column a has ~150 distinct values.

Next run SQL on this table as select a, sum(b), sum(c) from table x

The query creates 200 tasks. Further, the advanced metric scheduler delay
is significant % for most of these tasks. This seems very high overhead for
query on RDD with 2 partitions

It seems if this is run with less number of task, the query should run
faster ?

Any thoughts on how to control # of partitions for the group by (or other
SQLs) ?

Thanks,


Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Koert Kuipers
marcelo,
i was not aware of those fixes. its a fulltime job to keep up with spark...
i will take another look. it would be great if that works on spark
standalone also and resolves the issues i experienced before.

about putting stuff on classpath before spark or yarn... yeah you can shoot
yourself in the foot with it, but since the container is isolated it should
be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with
great success.

with the ability to put our own classes first and support for security yarn
now seems more attractive than standalone to me for many
applications/situations. never thought i would say that.

best

On Wed, Feb 4, 2015 at 4:01 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Koert,

 On Wed, Feb 4, 2015 at 11:35 AM, Koert Kuipers ko...@tresata.com wrote:
  do i understand it correctly that on yarn the the customer jars are truly
  placed before the yarn and spark jars on classpath? meaning at container
  construction time, on the same classloader? that would be great news for
 me.
  it would open up the possibility of using newer versions of many
 libraries.

 That's correct, the Yarn setting places the user's jars in the system
 classpath before Spark/Hadoop jars, so they can override classes
 needed by Spark/Hadoop.

 That's the main reason why it's not documented and not suggested
 unless there's no other workaround. Because you're potentially
 overriding classes that might break Spark, Hadoop or something else
 that's packaged with those. But if it works for your case, that's
 great.

 As for the userClassPath first thing, I've made some changes to the
 class loaders as part of implementing that option for Yarn [1], and
 someone also made similar changes in isolation [2]. So maybe the
 issues you were running into are fixed by either of those? In the
 future, it would be great to be able to declare that feature stable,
 since I believe it's a better alternative to overriding libraries that
 Spark or Hadoop depend on.

 [1] https://github.com/apache/spark/pull/3233
 [2] https://github.com/apache/spark/pull/3725

 --
 Marcelo



Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Marcelo Vanzin
Hi Koert,

On Wed, Feb 4, 2015 at 11:35 AM, Koert Kuipers ko...@tresata.com wrote:
 do i understand it correctly that on yarn the the customer jars are truly
 placed before the yarn and spark jars on classpath? meaning at container
 construction time, on the same classloader? that would be great news for me.
 it would open up the possibility of using newer versions of many libraries.

That's correct, the Yarn setting places the user's jars in the system
classpath before Spark/Hadoop jars, so they can override classes
needed by Spark/Hadoop.

That's the main reason why it's not documented and not suggested
unless there's no other workaround. Because you're potentially
overriding classes that might break Spark, Hadoop or something else
that's packaged with those. But if it works for your case, that's
great.

As for the userClassPath first thing, I've made some changes to the
class loaders as part of implementing that option for Yarn [1], and
someone also made similar changes in isolation [2]. So maybe the
issues you were running into are fixed by either of those? In the
future, it would be great to be able to declare that feature stable,
since I believe it's a better alternative to overriding libraries that
Spark or Hadoop depend on.

[1] https://github.com/apache/spark/pull/3233
[2] https://github.com/apache/spark/pull/3725

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why are task results large in this case?

2015-02-04 Thread ankits
I am running a job, part of which is to add some null values to the rows of
a SchemaRDD. The job fails with Total size of serialized results of 2692
tasks (1024.1 MB) is bigger than spark.driver.maxResultSize(1024.0 MB)

This is the code:

val in = sqc.parquetFile(...)
..
val presentColProj: SchemaRDD = in.select(symbolList : _*)

val nullSeq:Broadcast[Seq[_]] =
sc.broadcast(Seq.fill(missingColNames.size)(null))

val nullPaddedProj: RDD[Row]  = presentColProj.map { row = Row.fromSeq(
  Row.unapplySeq(row).get ++ nullSeq.value) }

..

sqc.applySchema(nullPaddedProj, newSchema)

I believe it is failing on the map. Is the size of the serialized result
large because of the rows in the map? Is there a better way to add some null
columns to a schemardd? Any insight would be appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-are-task-results-large-in-this-case-tp21503.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get Hive table schema using Spark SQL or otherwise

2015-02-04 Thread Ayoub
Given a hive context you could execute:
hiveContext.sql(describe TABLE_NAME) you would get the name of the fields
and their types

2015-02-04 21:47 GMT+01:00 nitinkak001 nitinkak...@gmail.com:

 I want to get a Hive table schema details into Spark. Specifically, I want
 to
 get column name and type information. Is it possible to do it e.g using
 JavaSchemaRDD or something else?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Marcelo Vanzin
On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote:
 about putting stuff on classpath before spark or yarn... yeah you can shoot
 yourself in the foot with it, but since the container is isolated it should
 be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with great
 success.

The container still has to use Hadoop libraries, e.g., to talk to
HDFS. If you override a library it needs with an incompatible one, you
may break something. So maybe you've just been lucky. :-)

In reality it should be pretty hard to cause breakages if you're
careful - e.g., when you override a jar of some library that generally
has multiple jars, such as Jackson, you need to include all of them,
not just the one(s) you need in your app.

MR also has an option similar to Spark's userClassPath (see
https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't
involve messing with the system's class path.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming - tracking/deleting processed files

2015-02-04 Thread ganterm
Thank you very much for the detailed answer. I feel a little dumb asking
but how would that work when using Scala (we use Spark 1.0.2)?
I can not figure it out. E.g. I am having trouble using ​UnionPartition
and NewHadoopPartition or even ds.values() is not an option for me (in the
IDE). Do you have any Scala code that does something similar? Any help is
appreciated.
BTW: I am creating the dStream like this:
val ds = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), f,
true).map(_._2.toString)

Thanks,
Markus

On Tue, Feb 3, 2015 at 4:55 AM, Prannoy [via Apache Spark User List] 
ml-node+s1001560n21478...@n3.nabble.com wrote:

 Hi,

 To keep processing the older file also you can use fileStream instead of
 textFileStream. It has a parameter to specify to look for already present
 files.

 For deleting the processed files one way is to get the list of all files
 in the dStream. This can be done by using the foreachRDD api of the dStream
 received from the fileStream(or textFileStream).

 Suppose the dStream is

 JavaDStreamString jpDstream = ssc
 .textFileStream(path/to/your/folder/);

 jpDstream.print();

  jpDstream.foreachRDD(

  new FunctionJavaRDDString, Void(){

   @Override

   public Void call(JavaRDDString arg0) throws Exception {

   getContentHigh(arg0,ssc);

   return null;

   }

  }

  );

  public static U void getContentHigh(JavaRDDString ds,
 JavaStreamingContext ssc){

 int lenPartition = ds.rdd().partitions().length; // this gives the number
 of files the stream picked

 for(int i=0;ilenPartition;i++) {

  UnionPartition upp = (UnionPartition) listPartitions[i];

NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();

 String fPath = npp.serializableHadoopSplit().value().toString();

 String[] nT =  tmpName.split(:);

 String name = nT[0]; // name is the path of the file picked for
 processing. the processing logic can be inside this loop. once //done you
 can delete the file using the path in the variable name


 }

 }


 Thanks.

 On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] 
 [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=21478i=0 wrote:

 We are running a Spark streaming job that retrieves files from a
 directory (using textFileStream).
 One concern we are having is the case where the job is down but files are
 still being added to the directory.
 Once the job starts up again, those files are not being picked up (since
 they are not new or changed while the job is running) but we would like
 them to be processed.
 Is there a solution for that? Is there a way to keep track what files
 have been processed and can we force older files to be picked up? Is
 there a way to delete the processed files?

 Thanks!
 Markus

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21478i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html
  To unsubscribe from Spark streaming - tracking/deleting processed files, 
 click
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21444code=Z2FudGVybUBnbWFpbC5jb218MjE0NDR8LTE4MTQ3NTI4NTM=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21504.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Koert Kuipers
i have never been a big fan of shading, since it leads to the same library
being packaged many times. for example, its not unusual to have ASM 10
times in a jar because of the shading policy they promote. and all that
because they broke one signature and without a really good reason.

i am a big fan of maintaining backwards compatibility and when you dont
change the maven artifact and java package, like jackson did when they went
from 1 to 2. now that makes sense :) you can even have jackson 1 and 2
together within the same project without any issues.



On Wed, Feb 4, 2015 at 3:44 PM, Corey Nolet cjno...@gmail.com wrote:

  leading to a real classloader hell if you tried to add a newer version
 of jar that spark already used.

 Spark at least makes this easier on the Guava side [1] by shading the
 package names internally so that there's no possibility of a conflict.
 Elasticsearch  Storm do this too for many of their dependencies and I
 think it's a great practice for libraries that are only used internally-
 specifically when those internal libraries are not exposed at all to the
 outside. If you are only using said libraries internally, that strategy may
 work for you as well, Koert. I'm going to ask about this on the Hadoop list
 as well to see if maybe there was a decision against it for reasons I
 haven't thought of.

  Another suggestion is to build Spark by yourself.

 I'm having trouble seeing what you mean here, Marcelo. Guava is already
 shaded to a different package for the 1.2.0 release. It shouldn't be
 causing conflicts.

 [1] https://issues.apache.org/jira/browse/SPARK-2848

 On Wed, Feb 4, 2015 at 2:35 PM, Koert Kuipers ko...@tresata.com wrote:

 the whole spark.files.userClassPathFirs never really worked for me in
 standalone mode, since jars were added dynamically which means they had
 different classloaders leading to a real classloader hell if you tried to
 add a newer version of jar that spark already used. see:
 https://issues.apache.org/jira/browse/SPARK-1863

 do i understand it correctly that on yarn the the customer jars are truly
 placed before the yarn and spark jars on classpath? meaning at container
 construction time, on the same classloader? that would be great news for
 me. it would open up the possibility of using newer versions of many
 libraries.


 On Wed, Feb 4, 2015 at 1:12 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Hi Corey,

 When you run on Yarn, Yarn's libraries are placed in the classpath,
 and they have precedence over your app's. So, with Spark 1.2, you'll
 get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get
 Guava 14 from Spark, so still a problem for you).

 Right now, the option Markus mentioned
 (spark.yarn.user.classpath.first) can be a workaround for you, since
 it will place your app's jars before Yarn's on the classpath.


 On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote:
  I'm having a really bad dependency conflict right now with Guava
 versions
  between my Spark application in Yarn and (I believe) Hadoop's version.
 
  The problem is, my driver has the version of Guava which my
 application is
  expecting (15.0) while it appears the Spark executors that are working
 on my
  RDDs have a much older version (assuming it's the old version on the
 Hadoop
  classpath).
 
  Is there a property like mapreduce.job.user.classpath.first' that I
 can set
  to make sure my own classpath is extablished first on the executors?



 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Marcelo Vanzin
Hi Corey,

On Wed, Feb 4, 2015 at 12:44 PM, Corey Nolet cjno...@gmail.com wrote:
 Another suggestion is to build Spark by yourself.

 I'm having trouble seeing what you mean here, Marcelo. Guava is already
 shaded to a different package for the 1.2.0 release. It shouldn't be causing
 conflicts.

That wasn't my suggestion and I definitely do not recommend rebuilding
Spark to work around these issues. :-)

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark with cdh 5.2.1

2015-02-04 Thread Mohit Jaggi
yep...it was unnecessary to create a 2.5 profile. i struggled a bit because
it wasn't clear that i *need* to select a profile using -P option. i didn't
have to do that for earlier hadoop versions.

On Fri, Jan 30, 2015 at 12:11 AM, Sean Owen so...@cloudera.com wrote:

 There is no need for a 2.5 profile. The hadoop-2.4 profile is for
 Hadoop 2.4 and beyond. You can set the particular version you want
 with -Dhadoop.version=

 You do not need to make any new profile to compile vs 2.5.0-cdh5.2.1.
 Again, the hadoop-2.4 profile is what you need.

 On Thu, Jan 29, 2015 at 11:33 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:
  Hi All,
  I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone
 tried Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be
 sufficient to make this work?
 
  Mohit.
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Spark SQL taking long time to print records from a table

2015-02-04 Thread Imran Rashid
Many operations in spark are lazy -- most likely your collect() statement
is actually forcing evaluation of severals steps earlier in the pipeline.
The logs  the UI might give you some info about all the stages that are
being run when you get to collect().

I think collect() is just fine if you are trying to pull just one record to
the driver, that shouldn't be a bottleneck.

On Wed, Feb 4, 2015 at 1:32 AM, jguliani jasminkguli...@gmail.com wrote:

 I have 3 text files in hdfs which I am reading using spark sql and
 registering them as table. After that I am doing almost 5-6 operations -
 including joins , group by etc.. And this whole process is taking hardly
 6-7
 secs. ( Source File size - 3 GB with almost 20 million rows ).
 As a final step of my computation, I am expecting only 1 record in my final
 rdd - named as acctNPIScr in below code snippet.

 My question here is that when I am trying to print this rdd either by
 registering as table and printing records from table or by this method -
 acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println). It is
 taking very long time - almost 1.5 minute to print 1 record.

 Can someone pls help me if I am doing something wrong in printing. What is
 the best way to print final result from schemardd.

 .
 val acctNPIScr = sqlContext.sql(SELECT party_id,
 sum(npi_int)/sum(device_priority_new) as  npi_score FROM AcctNPIScoreTemp
 group by party_id )
 acctNPIScr.registerTempTable(AcctNPIScore)

 val endtime = System.currentTimeMillis()
 logger.info(Total sql Time : + (endtime - st))   // this time is
 hardly 5 secs

 println(start printing)

 val result = sqlContext.sql(SELECT * FROM
 AcctNPIScore).collect().foreach(println)

 //acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println)

 logger.info(Total printing Time : + (System.currentTimeMillis() -
 endtime)) // print one record is taking almost 1.5 minute




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Tableau beta connector

2015-02-04 Thread ashu
Hi,
I am trying out the tableau beta connector to Spark SQL. I have few basics
question:
Will this connector be able to fetch the schemaRDDs into tableau.
Will all the schemaRDDs be exposed to tableau?
Basically I am not getting what tableau will fetch at data-source? Is it
existing files in HDFS? RDDs or something else.
Question may be naive but I did not get answer anywhere else. Would really
appreciate if someone has already tried it, can help me with this.

Thanks,
Ashutosh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



New combination-like RDD based on two RDDs

2015-02-04 Thread dash
Hey Spark gurus! Sorry for the confusing title. I do not know the exactly
description of my problem, if you know please tell me so I can change it :-)

Say I have two RDDs right now, and they are

val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6
val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9

I want combine rdd1 and rdd2 to get rdd3 which looks like

List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9)))

The order in _._2 does not matter, so you can treat it as a Set.

I tried to use zip, but since there is no guarantee that the length of rdd1
and rdd2 will be the same I do not know if it is doable.

Also I looked into PairedRDD, some people use union operation on two RDDs
and then apply a map function on it. Since I want all combinations according
to _._1, I do not know how to achieve it by union and map.

Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Follow up for closure on thread ...

1. spark.sql.shuffle.partitions is not on config page but is mentioned on
http://spark.apache.org/docs/1.2.0/sql-programming-guide.html. Would be
better to have it in config page as well for sake of completeness. Should I
file a doc bug ?
2. Regarding my #2 above (Spark should auto-determining # of tasks), there
is already a write up on SQL Programming page in Hive optimizations not in
Spark Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”. Any idea if and when this is
scheduled ? Even a rudimentary implementation (e.g. based on # of
partitions of underlying RDD, which is available now) would be a
improvement over current fixed 200 and would be a critical feature for
SparkSQL feasibility

Thanks

On Wed, Feb 4, 2015 at 4:09 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Awesome ! By setting this, I could minimize the collect overhead, e.g by
 setting it to # of partitions of the RDD.

 Two questions

 1. I had looked for such option in
 http://spark.apache.org/docs/latest/configuration.html but this is not
 documented. Seems this a doc. bug ?
 2. Ideally the shuffle partitions should be derive from underlying
 table(s) and a optimal number should be set for each query. Having one
 number across all queries is not ideal, nor do the consumer wants to set it
 before each query to different #. Any thoughts ?


 Thanks !

 On Wed, Feb 4, 2015 at 3:50 PM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hi Manoj,

 You can set the number of partitions you want your sql query to use. By
 default it is 200 and thus you see that number. You can update it using the
 spark.sql.shuffle.partitions property

 spark.sql.shuffle.partitions200Configures the number of partitions to
 use when shuffling data for joins or aggregations.

 Thanks
 Ankur

 On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi,

 Any thoughts on this?

 Most of the 200 tasks in collect phase take less than 20 ms and lot of
 time is spent on scheduling these. I suspect the overall time will reduce
 if # of tasks are dropped to a much smaller # (close to # of partitions?)

 Any help is appreciated

 Thanks

 On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2
 Data is read from parquet with 2 partitions and is cached as table with
 2 partitions. Verified in UI that it shows RDD with 2 partitions  it is
 fully cached in memory

 Cached data contains column a, b, c. Column a has ~150 distinct values.

 Next run SQL on this table as select a, sum(b), sum(c) from table x

 The query creates 200 tasks. Further, the advanced metric scheduler
 delay is significant % for most of these tasks. This seems very high
 overhead for query on RDD with 2 partitions

 It seems if this is run with less number of task, the query should run
 faster ?

 Any thoughts on how to control # of partitions for the group by (or
 other SQLs) ?

 Thanks,







synchronously submitting spark jobs

2015-02-04 Thread ey-chih chow
Hi,

I would like to submit spark jobs one by one, in that the next job will not
be submitted until the previous one succeeds.  Spark_submit can only submit
jobs asynchronously.  Is there any way I can submit jobs sequentially? 
Thanks.

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/synchronously-submitting-spark-jobs-tp21507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Large # of tasks in groupby on single table

2015-02-04 Thread Ankur Srivastava
Hi Manoj,

You can set the number of partitions you want your sql query to use. By
default it is 200 and thus you see that number. You can update it using the
spark.sql.shuffle.partitions property

spark.sql.shuffle.partitions200Configures the number of partitions to use
when shuffling data for joins or aggregations.

Thanks
Ankur

On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Hi,

 Any thoughts on this?

 Most of the 200 tasks in collect phase take less than 20 ms and lot of
 time is spent on scheduling these. I suspect the overall time will reduce
 if # of tasks are dropped to a much smaller # (close to # of partitions?)

 Any help is appreciated

 Thanks

 On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2
 Data is read from parquet with 2 partitions and is cached as table with 2
 partitions. Verified in UI that it shows RDD with 2 partitions  it is
 fully cached in memory

 Cached data contains column a, b, c. Column a has ~150 distinct values.

 Next run SQL on this table as select a, sum(b), sum(c) from table x

 The query creates 200 tasks. Further, the advanced metric scheduler
 delay is significant % for most of these tasks. This seems very high
 overhead for query on RDD with 2 partitions

 It seems if this is run with less number of task, the query should run
 faster ?

 Any thoughts on how to control # of partitions for the group by (or other
 SQLs) ?

 Thanks,





Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Koert Kuipers
anyhow i am ranting... sorry

On Wed, Feb 4, 2015 at 5:54 PM, Koert Kuipers ko...@tresata.com wrote:

 yeah i think we have been lucky so far. but i dont really see how i have a
 choice. it would be fine if say hadoop exposes a very small set of
 libraries as part of the classpath. but if i look at the jars on hadoop
 classpath its a ton! and why? why does parquet need to be included with
 hadoop for example? or avro? it just makes my life harder. and i dont
 really see who benefits.

 the yarn classpath is insane too.

 On Wed, Feb 4, 2015 at 4:26 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote:
  about putting stuff on classpath before spark or yarn... yeah you can
 shoot
  yourself in the foot with it, but since the container is isolated it
 should
  be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with
 great
  success.

 The container still has to use Hadoop libraries, e.g., to talk to
 HDFS. If you override a library it needs with an incompatible one, you
 may break something. So maybe you've just been lucky. :-)

 In reality it should be pretty hard to cause breakages if you're
 careful - e.g., when you override a jar of some library that generally
 has multiple jars, such as Jackson, you need to include all of them,
 not just the one(s) you need in your app.

 MR also has an option similar to Spark's userClassPath (see
 https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't
 involve messing with the system's class path.

 --
 Marcelo





Re: Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Hi,

Any thoughts on this?

Most of the 200 tasks in collect phase take less than 20 ms and lot of time
is spent on scheduling these. I suspect the overall time will reduce if #
of tasks are dropped to a much smaller # (close to # of partitions?)

Any help is appreciated

Thanks

On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2
 Data is read from parquet with 2 partitions and is cached as table with 2
 partitions. Verified in UI that it shows RDD with 2 partitions  it is
 fully cached in memory

 Cached data contains column a, b, c. Column a has ~150 distinct values.

 Next run SQL on this table as select a, sum(b), sum(c) from table x

 The query creates 200 tasks. Further, the advanced metric scheduler
 delay is significant % for most of these tasks. This seems very high
 overhead for query on RDD with 2 partitions

 It seems if this is run with less number of task, the query should run
 faster ?

 Any thoughts on how to control # of partitions for the group by (or other
 SQLs) ?

 Thanks,



Re: Problem with changing the akka.framesize parameter

2015-02-04 Thread Shixiong Zhu
The unit of spark.akka.frameSize is MB. The max value is 2047.

Best Regards,
Shixiong Zhu

2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com:

 I am trying to run a spark application with

 -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000
 -Dspark.akka.frameSize=1

 and the job fails because one or more of the akka frames are larger than
 1mb (12000 ish).

 When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and
 RUN:

 ./spark/bin/spark-submit  --driver-memory 30g --executor-memory 30g
 mySparkCode.py

 I get an error in the startup as :


 ERROR OneForOneStrategy: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and ha
 s constructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 java.lang.IllegalArgumentException: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and has const
 ructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)
 at

 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
 at scala.util.Try$.apply(Try.scala:161)
 at scala.util.Failure.recover(Try.scala:185)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at

 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
 at

 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
 'maximum-frame-size' must be at least 32000 bytes
 at scala.Predef$.require(Predef.scala:233)
 at
 akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)
 at

 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at

 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
 at org.apache.spark.SparkContext.init(SparkContext.scala:203)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:214)
 at

 py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
 at
 

Re: Problem with changing the akka.framesize parameter

2015-02-04 Thread Shixiong Zhu
Could you clarify why you need a 10G akka frame size?

Best Regards,
Shixiong Zhu

2015-02-05 9:20 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 The unit of spark.akka.frameSize is MB. The max value is 2047.

 Best Regards,
 Shixiong Zhu

 2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com:

 I am trying to run a spark application with

 -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000
 -Dspark.akka.frameSize=1

 and the job fails because one or more of the akka frames are larger than
 1mb (12000 ish).

 When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and
 RUN:

 ./spark/bin/spark-submit  --driver-memory 30g --executor-memory 30g
 mySparkCode.py

 I get an error in the startup as :


 ERROR OneForOneStrategy: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and ha
 s constructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 java.lang.IllegalArgumentException: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and has const
 ructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)
 at

 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
 at scala.util.Try$.apply(Try.scala:161)
 at scala.util.Failure.recover(Try.scala:185)
 at
 akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
 at
 akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at

 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
 at

 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
 'maximum-frame-size' must be at least 32000 bytes
 at scala.Predef$.require(Predef.scala:233)
 at
 akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)
 at

 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at

 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
 at
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
 at org.apache.spark.SparkContext.init(SparkContext.scala:203)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at 

Is there a way to access Hive UDFs in a HiveContext?

2015-02-04 Thread vishpatel
I'm trying to access a permanent Hive UDF,

scala val data = hc.sql(select func.md5(some_string) from some_table)
data: org.apache.spark.sql.SchemaRDD =
SchemaRDD[19] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
java.lang.RuntimeException: Couldn't find function func.md5

Version: Spark 1.2 on CDH 5.3.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-access-Hive-UDFs-in-a-HiveContext-tp21510.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: New combination-like RDD based on two RDDs

2015-02-04 Thread dash
Problem solved. A simple join will do the work

val prefix = new PairRDDFunctions[Int, Set[Int]](sc.parallelize(List((9,
Set(4)), (1,Set(3)), (2,Set(5)), (2,Set(4)

val suffix = sc.parallelize(List((1, Set(1)), (2, Set(6)), (2, Set(5)), (2,
Set(7

prefix.join(suffix).collect().foreach(println)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508p21511.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pyspark - gzip output compression

2015-02-04 Thread Kane Kim
How to save RDD with gzip compression?

Thanks.


Re: Tableau beta connector

2015-02-04 Thread Ashutosh Trivedi (MT2013030)
Thanks Denny and Ismail.


Denny ,I went through your blog, It was great help. I guess tableau beta 
connector also following the same procedure,you described in blog. I am 
building the Spark now.

Basically what I don't get is, where to put my data so that tableau can extract.


So  Ismail,its just Spark SQL. No RDDs I think I am getting it now . We use 
spark for our big data processing and we want processed data (Rdd) into 
tableau. So we should put our data in hive metastore and tableau will extract 
it from there using this connector? Correct me if I am wrong.


I guess I have to look at how thrift server works.


From: Denny Lee denny.g@gmail.com
Sent: Thursday, February 5, 2015 12:20 PM
To: İsmail Keskin; Ashutosh Trivedi (MT2013030)
Cc: user@spark.apache.org
Subject: Re: Tableau beta connector

Some quick context behind how Tableau interacts with Spark / Hive can also be 
found at https://www.concur.com/blog/en-us/connect-tableau-to-sparksql  - its 
for how to connect from Tableau to the thrift server before the official 
Tableau beta connector but should provide some of the additional context called 
out.   HTH!

On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin 
ismail.kes...@dilisim.commailto:ismail.kes...@dilisim.com wrote:
Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD 
stuff applies, you just issue SQL queries from Tableau.

The table metadata can come from Hive Metastore if you place your hive-site.xml 
to configuration directory of Spark.

On Thu, Feb 5, 2015 at 8:11 AM, ashu 
ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote:
Hi,
I am trying out the tableau beta connector to Spark SQL. I have few basics
question:
Will this connector be able to fetch the schemaRDDs into tableau.
Will all the schemaRDDs be exposed to tableau?
Basically I am not getting what tableau will fetch at data-source? Is it
existing files in HDFS? RDDs or something else.
Question may be naive but I did not get answer anywhere else. Would really
appreciate if someone has already tried it, can help me with this.

Thanks,
Ashutosh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Tableau beta connector

2015-02-04 Thread İsmail Keskin
Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the
RDD stuff applies, you just issue SQL queries from Tableau.

The table metadata can come from Hive Metastore if you place your
hive-site.xml to configuration directory of Spark.

On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.org wrote:

 Hi,
 I am trying out the tableau beta connector to Spark SQL. I have few basics
 question:
 Will this connector be able to fetch the schemaRDDs into tableau.
 Will all the schemaRDDs be exposed to tableau?
 Basically I am not getting what tableau will fetch at data-source? Is it
 existing files in HDFS? RDDs or something else.
 Question may be naive but I did not get answer anywhere else. Would really
 appreciate if someone has already tried it, can help me with this.

 Thanks,
 Ashutosh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Job running on localhost on yarn cluster

2015-02-04 Thread Felix C
Is YARN_CONF_DIR set?

--- Original Message ---

From: Aniket Bhatnagar aniket.bhatna...@gmail.com
Sent: February 4, 2015 6:16 AM
To: kundan kumar iitr.kun...@gmail.com, spark users 
user@spark.apache.org
Subject: Re: Spark Job running on localhost on yarn cluster

Have you set master in SparkConf/SparkContext in your code? Driver logs
show in which mode the spark job is running. Double check if the logs
mention local or yarn-cluster.
Also, what's the error that you are getting?

On Wed, Feb 4, 2015, 6:13 PM kundan kumar iitr.kun...@gmail.com wrote:

 Hi,

 I am trying to execute my code on a yarn cluster

 The command which I am using is

 $SPARK_HOME/bin/spark-submit --class EDDApp
 target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster
 --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath

 But, I can see that this program is running only on the localhost.

 Its able to read the file from hdfs.

 I have tried this in standalone mode and it works fine.

 Please suggest where is it going wrong.


 Regards,
 Kundan



Re: Tableau beta connector

2015-02-04 Thread Denny Lee
The context is that you would create your RDDs and then persist them in
Hive. Once in Hive, the data is accessible from the Tableau extract through
Spark thrift server.
On Wed, Feb 4, 2015 at 23:36 Ashutosh Trivedi (MT2013030) 
ashutosh.triv...@iiitb.org wrote:

  Thanks Denny and Ismail.


  Denny ,I went through your blog, It was great help. I guess tableau beta
 connector also following the same procedure,you described in blog. I am
 building the Spark now.

 Basically what I don't get is, where to put my data so that tableau can
 extract.


  So  Ismail,its just Spark SQL. No RDDs I think I am getting it now . We
 use spark for our big data processing and we want *processed data (Rdd)*
 into tableau. So we should put our data in hive metastore and tableau will
 extract it from there using this connector? Correct me if I am wrong.


  I guess I have to look at how thrift server works.
  --
 *From:* Denny Lee denny.g@gmail.com
 *Sent:* Thursday, February 5, 2015 12:20 PM
 *To:* İsmail Keskin; Ashutosh Trivedi (MT2013030)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Tableau beta connector

  Some quick context behind how Tableau interacts with Spark / Hive can
 also be found at
 https://www.concur.com/blog/en-us/connect-tableau-to-sparksql  - its for
 how to connect from Tableau to the thrift server before the official
 Tableau beta connector but should provide some of the additional context
 called out.   HTH!

 On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin ismail.kes...@dilisim.com
 wrote:

 Tableau connects to Spark Thrift Server via an ODBC driver. So, none of
 the RDD stuff applies, you just issue SQL queries from Tableau.

  The table metadata can come from Hive Metastore if you place your
 hive-site.xml to configuration directory of Spark.

 On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.org wrote:

 Hi,
 I am trying out the tableau beta connector to Spark SQL. I have few
 basics
 question:
 Will this connector be able to fetch the schemaRDDs into tableau.
 Will all the schemaRDDs be exposed to tableau?
 Basically I am not getting what tableau will fetch at data-source? Is it
 existing files in HDFS? RDDs or something else.
 Question may be naive but I did not get answer anywhere else. Would
 really
 appreciate if someone has already tried it, can help me with this.

 Thanks,
 Ashutosh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Whether standalone spark support kerberos?

2015-02-04 Thread Jander g
Hope someone helps me. Thanks.

On Wed, Feb 4, 2015 at 6:14 PM, Jander g jande...@gmail.com wrote:

 We have a standalone spark cluster for kerberos test.

 But when reading from hdfs, i get error output: Can't get Master Kerberos
 principal for use as renewer.

 So Whether standalone spark support kerberos? can anyone confirm it? or
 what i missed?

 Thanks in advance.

 --
 Thanks,
 Jander




-- 
Thanks,
Jander


How many stages in my application?

2015-02-04 Thread Joe Wass
I'm sitting here looking at my application crunching gigabytes of data on a
cluster and I have no idea if it's an hour away from completion or a
minute. The web UI shows progress through each stage, but not how many
stages remaining. How can I work out how many stages my program will take
automatically?

My application has a slightly interesting DAG (re-use of functions that
contain Spark transformations, persistent RDDs). Not that complex, but not
'step 1, step 2, step 3'.

I'm guessing that if the driver program runs sequentially sending messages
to Spark, then Spark has no knowledge of the structure of the driver
program. Therefore it's necessary to execute it on a small test dataset and
see how many stages result?

When I set spark.eventLog.enabled = true and run on (very small) test data
I don't get any stage messages in my STDOUT or in the log file. This is on
a `local` instance.

Did I miss something obvious?

Thanks!

Joe


Joining piped RDDs

2015-02-04 Thread Pavel Velikhov
Hi!

  We have a common usecase with Spark - we go out to some database, e.g. 
Cassandra, crunch though all of its data, but along the RDD pipeline we use a 
pipe operator to some script. All the data before the pipe has some unique IDs, 
but inside the pipe everything is lost.

  The only current solution we have is to format the data into the pipe, so it 
includes the ids, and then restore it all in a map after the pipe.

  However it would be much nicer if we could just join/zip back the output of 
the pipe. However we can’t cache the RDDs, so it would be nice to have a 
forkRDD of some sort that only keeps the last partition in cache (since we’re 
guaranteed that there’ll be a zip later on and the dataflow will be 
synchronized). Or maybe we can already do this in Spark?

Thank you,
Pavel Velikhov
Chief Science Officer
TopRater
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception in thread main java.lang.SecurityException: class javax.servlet.ServletRegistration'

2015-02-04 Thread Sean Owen
It means you did not exclude the Servlet APIs from some dependency in
your app, and one of them is bringing it in every time. Look at the
dependency tree and exclude whatever brings in javax.servlet.

It should be already available in Spark, and the particular
javax.servlet JAR from Oracle has signing info that you have to strip
out, or simply exclude the whole thing.

On Wed, Feb 4, 2015 at 1:20 AM, DEVAN M.S. msdeva...@gmail.com wrote:
 HI all,
 I need a help.

 When i am trying to run spark project it is showing that, Exception in
 thread main java.lang.SecurityException: class
 javax.servlet.ServletRegistration's signer information does not match
 signer information of other classes in the same package.
 After deleting /home/devan/.ivy2/cache/javax.servlet this folder the
 things are working...

 Don't know what happened..
 Please help, Because on each restart the same folder is coming there.
 Found this:
 http://stackoverflow.com/questions/2877262/java-securityexception-signer-information-does-not-match

 which one makes the conflict ?? These are the libraries using,

 libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0

 libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.1.0

 libraryDependencies += com.googlecode.json-simple % json-simple %
 1.1.1

 libraryDependencies += org.apache.spark % spark-hive_2.10 % 1.1.0

 libraryDependencies += org.apache.spark % spark-graphx_2.10 % 1.1.0

 libraryDependencies += org.apache.spark % spark-mllib_2.10 % 1.1.0

 libraryDependencies +=com.google.code.gson % gson % 2.3.1

 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0

 libraryDependencies += org.apache.spark % spark-streaming-mqtt_2.10 %
 1.1.0

 libraryDependencies += org.tachyonproject % tachyon % 0.5.0

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Job running on localhost on yarn cluster

2015-02-04 Thread kundan kumar
Hi,

I am trying to execute my code on a yarn cluster

The command which I am using is

$SPARK_HOME/bin/spark-submit --class EDDApp
target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster
--num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath

But, I can see that this program is running only on the localhost.

Its able to read the file from hdfs.

I have tried this in standalone mode and it works fine.

Please suggest where is it going wrong.


Regards,
Kundan


Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Corey Nolet
Bo yang-

I am using Spark 1.2.0 and undoubtedly there are older Guava classes which
are being picked up and serialized with the closures when they are sent
from the driver to the executors because the class serial version ids don't
match from the driver to the executors. Have you tried doing this? Guava
works fine for me when this is not the case- but as soon as a Guava class
which was changed from versions 15.0 is serialized, it fails. See [1] fore
info- we did fairly extensive testing last night. I've isolated the issue
to Hadoop's really old version of Guava being picked up. Again, this is
only noticeable when classes are used from Guava 15.0 that were changed
from previous versions and those classes are being serialized on the driver
and shipped to the executors.


[1] https://github.com/calrissian/mango/issues/158

On Wed, Feb 4, 2015 at 1:31 AM, bo yang bobyan...@gmail.com wrote:

 Corey,

 Which version of Spark do you use? I am using Spark 1.2.0, and  guava
 15.0. It seems fine.

 Best,
 Bo


 On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid
 wrote:

  Try spark.yarn.user.classpath.first (see
 https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN).
 Also thread at
 http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html
 .

 HTH,
 Markus

 On 02/03/2015 11:20 PM, Corey Nolet wrote:

 I'm having a really bad dependency conflict right now with Guava versions
 between my Spark application in Yarn and (I believe) Hadoop's version.

  The problem is, my driver has the version of Guava which my application
 is expecting (15.0) while it appears the Spark executors that are working
 on my RDDs have a much older version (assuming it's the old version on the
 Hadoop classpath).

  Is there a property like mapreduce.job.user.classpath.first' that I
 can set to make sure my own classpath is extablished first on the executors?






Re: Multiple running SparkContexts detected in the same JVM!

2015-02-04 Thread Sean Owen
It means what it says. You should not have multiple SparkContexts
running in one JVM. It was always the wrong thing to do, but now is an
explicit error. When you run spark-shell, you already have a
SparkContext (sc) so there is no need to make another one. Just don't
do that.

On Wed, Feb 4, 2015 at 12:20 AM, gavin zhang gavin@gmail.com wrote:
 I have a cluster which running CDH5.1.0 with Spark component.
 Because the default version of Spark from CDH5.1.0 is 1.0.0 while I want to
 use some features of Spark 1.2.0, I compiled another Spark with Maven.
 But when I run into Spark-shell and created a new SparkContext, I met the
 below error:

 15/02/04 14:08:19 WARN SparkContext: Multiple running SparkContexts detected
 in the same JVM!
 org.apache.spark.SparkException: Only one SparkContext may be running in
 this JVM (see SPARK-2243). To ignore this error, set
 spark.driver.allowMultipleContexts = true. The currently running
 SparkContext was created at
 ...

 And I tried to delete the default Spark and
 *set(spark.driver.allowMultipleContexts, true) * option, But It didn't
 work.

 How could I fix it?





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-running-SparkContexts-detected-in-the-same-JVM-tp21492.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problems with GC and time to execute with different number of executors.

2015-02-04 Thread Guillermo Ortiz
I execute a job in Spark where I'm processing a file of 80Gb in HDFS.
I have 5 slaves:
(32cores /256Gb / 7physical disks) x 5

I have been trying many different configurations with YARN.
yarn.nodemanager.resource.memory-mb 196Gb
yarn.nodemanager.resource.cpu-vcores 24

I have tried to execute the job with different number of executors a
memory (1-4g)
With 20 executors takes 25s each iteration (128mb) and it never has a
really long time waiting because GC.

When I execute around 60 executors the process time it's about 45s and
some tasks take until one minute because GC.

I have no idea why it's calling GC when I execute more executors simultaneously.
The another question it's why it takes more time to execute each
block. My theory about the this it's because there're only 7 physical
disks and it's not the same 5 processes writing than 20.

The code is pretty simple, it's just a map function which parse a line
and write the output in HDFS. There're a lot of substrings inside of
the function what it could cause GC.

Any theory about?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Whether standalone spark support kerberos?

2015-02-04 Thread Jander g
We have a standalone spark cluster for kerberos test.

But when reading from hdfs, i get error output: Can't get Master Kerberos
principal for use as renewer.

So Whether standalone spark support kerberos? can anyone confirm it? or
what i missed?

Thanks in advance.

-- 
Thanks,
Jander


Re: 2GB limit for partitions?

2015-02-04 Thread Imran Rashid
Hi Mridul,


do you think you'll keep working on this, or should this get picked up by
others?  Looks like there was a lot of work put into LargeByteBuffer, seems
promising.

thanks,
Imran

On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com
wrote:

 That is fairly out of date (we used to run some of our jobs on it ... But
 that is forked off 1.1 actually).

 Regards
 Mridul


 On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide
 the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a
 trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job
 aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com
  *To:* Michael Albert m_albert...@yahoo.com
  *Cc:* user@spark.apache.org user@spark.apache.org
  *Sent:* Monday, February 2, 2015 10:13 PM
  *Subject:* Re: 2GB limit for partitions?

 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition, 

Re: Sort based shuffle not working properly?

2015-02-04 Thread Imran Rashid
I think you are interested in secondary sort, which is still being worked
on:

https://issues.apache.org/jira/browse/SPARK-3655

On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Koert Kuipers
yeah i think we have been lucky so far. but i dont really see how i have a
choice. it would be fine if say hadoop exposes a very small set of
libraries as part of the classpath. but if i look at the jars on hadoop
classpath its a ton! and why? why does parquet need to be included with
hadoop for example? or avro? it just makes my life harder. and i dont
really see who benefits.

the yarn classpath is insane too.

On Wed, Feb 4, 2015 at 4:26 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote:
  about putting stuff on classpath before spark or yarn... yeah you can
 shoot
  yourself in the foot with it, but since the container is isolated it
 should
  be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with
 great
  success.

 The container still has to use Hadoop libraries, e.g., to talk to
 HDFS. If you override a library it needs with an incompatible one, you
 may break something. So maybe you've just been lucky. :-)

 In reality it should be pretty hard to cause breakages if you're
 careful - e.g., when you override a jar of some library that generally
 has multiple jars, such as Jackson, you need to include all of them,
 not just the one(s) you need in your app.

 MR also has an option similar to Spark's userClassPath (see
 https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't
 involve messing with the system's class path.

 --
 Marcelo



Re: Spark streaming app shutting down

2015-02-04 Thread Dibyendu Bhattacharya
Thanks Akhil for mentioning this Low Level Consumer (
https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better
fault tolerant mechanism than any existing Kafka consumer available . This
has no data loss on receiver failure and have ability to reply or restart
itself in-case of failure. You can definitely give it a try .

Dibyendu

On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault
 tolerance, which means it can handle the receiver/driver failures. You can
 also look at the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer which has a better
 fault tolerance mechanism for receiver failures. This low level consumer
 will push the offset of the message being read into zookeeper for fault
 tolerance. In your case i think mostly the inflight data would be lost if
 you arent using any of the fault tolerance mechanism.

 Thanks
 Best Regards

 On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Sprakans,

 I'm running a spark streaming app which reads data from kafka topic does
 some processing and then persists the results in HBase.

 I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8
 cores each). I've enable checkpointing  I am also  rate limiting my
 kafkaReceivers so that the number of items read is not more than 10 records
 per sec.
 The kafkaReceiver I'm using is *not* ReliableKafkaReceiver.

 This app was running fine for ~3 days then there was an increased load on
 the HBase server because of some other process querying HBase tables.
 This led to increase in the batch processing time of the spark batches
 (processed 1 min batch in 10 min) which previously was finishing in 20 sec
 which in turn led to the shutdown of the spark application, PFA the
 executor logs.

 From the logs I'm getting below exceptions *[1]*  *[2]* looks like
 there was some outstanding Jobs that didn't get processed or the Job
 couldn't find the input data. From the logs it looks seems that the
 shutdown hook gets invoked but it cannot process the in-flight block.

 I have a couple of queries on this
   1) Does this mean that these jobs failed and the *in-flight data *is
 lost?
   2) Does the Spark job *buffers kafka* input data while the Job is
 under processing state for 10 mins and on shutdown is that too lost? (I do
 not see any OOM error in the logs).
   3) Can we have *explicit commits* enabled in the kafkaReceiver so that
 the offsets gets committed only when the RDD(s) get successfully processed?

 Also I'd like to know if there is a *graceful way to shutdown a spark
 app running on yarn*. Currently I'm killing the yarn app to stop it
 which leads to loss of that job's history wheras in this case the
 application stops and succeeds and thus preserves the logs  history.

 *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still
 have 1 requests outstanding when connection from
 hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed
 *[2]* java.lang.Exception: Could not compute split, block
 input-2-1422901498800 not found
 *[3]* 
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode
 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63
 does not have any open files.

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org