Problem with changing the akka.framesize parameter
I am trying to run a spark application with -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000 -Dspark.akka.frameSize=1 and the job fails because one or more of the akka frames are larger than 1mb (12000 ish). When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and RUN: ./spark/bin/spark-submit --driver-memory 30g --executor-memory 30g mySparkCode.py I get an error in the startup as : ERROR OneForOneStrategy: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and ha s constructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters java.lang.IllegalArgumentException: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and has const ructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620) at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: requirement failed: Setting 'maximum-frame-size' must be at least 32000 bytes at scala.Predef$.require(Predef.scala:233) at akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Can anyone give me a clue about whats going wrong here?? I am running spark 1.1.0 in r3.2xlarge EC2
Re: Spark Job running on localhost on yarn cluster
1) Parameters like --num-executors should come before the jar. That is, you want something like$SPARK_HOME --num-executors 3 --driver-memory 6g --executor-memory 7g \--master yarn-cluster --class EDDApp target/scala-2.10/eddjar \outputPath That is, *your* parameters come after the jar, spark's parameters come *before* the jar.That's how spark knows which are which (at least that is my understanding). 2‚ Double check that in your code, when you create the SparkContext or the configuration object, that you don't set local there.(I don't recall the exact order of priority if the parameters disagree with the code). Good luck! -Mike From: kundan kumar iitr.kun...@gmail.com To: spark users user@spark.apache.org Sent: Wednesday, February 4, 2015 7:41 AM Subject: Spark Job running on localhost on yarn cluster Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards,Kundan
Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Hi Michael, judging from the logs, it seems that those tasks are just working a really long time. If you have long running tasks, then you wouldn't expect the driver to output anything while those tasks are working. What is unusual is that there is no activity during all that time the tasks are executing. Are you sure you are looking at the activity of the executors (the nodes that are actually running the tasks), and not the activity of the driver node (the node where your main program lives, but that doesn't do any of the distributed computation)? It would be perfectly normal for the driver node to be idle while all the executors were busy with long running tasks. I would look at: (a) the cpu usage etc. of the executor nodes during those long running tasks (b) the thread dumps of the executors during those long running tasks (available via the UI under the Executors tab, or just log into the boxes and run jstack). Ideally this will point out a hotspot in your code that is making these tasks take so long. (Or perhaps it'll point out what is going on in spark internals that is so slow) (c) the summary metrics for the long running stage, when it finally finishes (also available in the UI, under the Stages tab). You will get a breakdown of how much time is spent in various phases of the tasks, how much data is read, etc., which can help you figure out why tasks are slow Hopefully this will help you find out what is taking so long. If you find out the executors really arent' doing anything during these really long tasks, it would be great to find that out, and maybe get some more info for a bug report. Imran On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! First, my sincere thanks to all who have given me advice. Following previous discussion, I've rearranged my code to try to keep the partitions to more manageable sizes. Thanks to all who commented. At the moment, the input set I'm trying to work with is about 90GB (avro parquet format). When I run on a reasonable chunk of the data (say half) things work reasonably. On the full data, the spark process stalls. That is, for about 1.5 hours out of a 3.5 hour run, I see no activity. No cpu usage, no error message, no network activity. It just seems to sits there. The messages bracketing the stall are shown below. Any advice on how to diagnose this? I don't get any error messages. The spark UI says that it is running a stage, but it makes no discernible progress. Ganglia shows no CPU usage or network activity. When I shell into the worker nodes there are no filled disks or other obvious problems. How can I discern what Spark is waiting for? The only weird thing seen, other than the stall, is that the yarn logs on the workers have lines with messages like this: 2015-02-03 22:59:58,890 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 13158 for container-id container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 7.6 GB of 42.5 GB virtual memory used It's rather strange that it mentions 42.5 GB of virtual memory. The machines are EMR machines with 32 GB of physical memory and, as far as I can determine, no swap space. The messages bracketing the stall are shown below. Any advice is welcome. Thanks! Sincerely, Mike Albert Before the stall. 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set() 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 6) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: List(Stage 7) At this point, I see no activity for 1.5 hours except for this (XXX for I.P. address) 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor: akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor Then finally it started again: 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4) 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4) 15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) finished in 7209.534 s
Re: “mapreduce.job.user.classpath.first” for Spark
Hi Corey, I see. Thanks for making it clear. I may be lucky not hitting code path of such Guava classes. But I hit some other jar conflicts when using Spark to connect to AWS S3. Then I had to manually try each version of org.apache.httpcomponents until I found a proper old version. Another suggestion is to build Spark by yourself. Anyway, would like to see your update once you figure out the solution. Best wishes! Bo On Wed, Feb 4, 2015 at 4:47 AM, Corey Nolet cjno...@gmail.com wrote: Bo yang- I am using Spark 1.2.0 and undoubtedly there are older Guava classes which are being picked up and serialized with the closures when they are sent from the driver to the executors because the class serial version ids don't match from the driver to the executors. Have you tried doing this? Guava works fine for me when this is not the case- but as soon as a Guava class which was changed from versions 15.0 is serialized, it fails. See [1] fore info- we did fairly extensive testing last night. I've isolated the issue to Hadoop's really old version of Guava being picked up. Again, this is only noticeable when classes are used from Guava 15.0 that were changed from previous versions and those classes are being serialized on the driver and shipped to the executors. [1] https://github.com/calrissian/mango/issues/158 On Wed, Feb 4, 2015 at 1:31 AM, bo yang bobyan...@gmail.com wrote: Corey, Which version of Spark do you use? I am using Spark 1.2.0, and guava 15.0. It seems fine. Best, Bo On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid wrote: Try spark.yarn.user.classpath.first (see https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN). Also thread at http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html . HTH, Markus On 02/03/2015 11:20 PM, Corey Nolet wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors?
Re: 2GB limit for partitions?
That work is from more than an year back and is not maintained anymore since we do not use it inhouse now. Also note that there have been quite a lot of changes in spark ... Including some which break assumptions made in the patch, so it's value is very low - having said that, do feel free to work on the jira and/or use the patch if it helps ! Regards Mridul On Wednesday, February 4, 2015, Imran Rashid iras...@cloudera.com wrote: Hi Mridul, do you think you'll keep working on this, or should this get picked up by others? Looks like there was a lot of work put into LargeByteBuffer, seems promising. thanks, Imran On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com javascript:_e(%7B%7D,'cvml','mri...@gmail.com'); wrote: That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com javascript:_e(%7B%7D,'cvml','iras...@cloudera.com'); wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM
Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Also, do you see any lines in the YARN NodeManager logs where it says that it's killing a container? -Sandy On Wed, Feb 4, 2015 at 8:56 AM, Imran Rashid iras...@cloudera.com wrote: Hi Michael, judging from the logs, it seems that those tasks are just working a really long time. If you have long running tasks, then you wouldn't expect the driver to output anything while those tasks are working. What is unusual is that there is no activity during all that time the tasks are executing. Are you sure you are looking at the activity of the executors (the nodes that are actually running the tasks), and not the activity of the driver node (the node where your main program lives, but that doesn't do any of the distributed computation)? It would be perfectly normal for the driver node to be idle while all the executors were busy with long running tasks. I would look at: (a) the cpu usage etc. of the executor nodes during those long running tasks (b) the thread dumps of the executors during those long running tasks (available via the UI under the Executors tab, or just log into the boxes and run jstack). Ideally this will point out a hotspot in your code that is making these tasks take so long. (Or perhaps it'll point out what is going on in spark internals that is so slow) (c) the summary metrics for the long running stage, when it finally finishes (also available in the UI, under the Stages tab). You will get a breakdown of how much time is spent in various phases of the tasks, how much data is read, etc., which can help you figure out why tasks are slow Hopefully this will help you find out what is taking so long. If you find out the executors really arent' doing anything during these really long tasks, it would be great to find that out, and maybe get some more info for a bug report. Imran On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! First, my sincere thanks to all who have given me advice. Following previous discussion, I've rearranged my code to try to keep the partitions to more manageable sizes. Thanks to all who commented. At the moment, the input set I'm trying to work with is about 90GB (avro parquet format). When I run on a reasonable chunk of the data (say half) things work reasonably. On the full data, the spark process stalls. That is, for about 1.5 hours out of a 3.5 hour run, I see no activity. No cpu usage, no error message, no network activity. It just seems to sits there. The messages bracketing the stall are shown below. Any advice on how to diagnose this? I don't get any error messages. The spark UI says that it is running a stage, but it makes no discernible progress. Ganglia shows no CPU usage or network activity. When I shell into the worker nodes there are no filled disks or other obvious problems. How can I discern what Spark is waiting for? The only weird thing seen, other than the stall, is that the yarn logs on the workers have lines with messages like this: 2015-02-03 22:59:58,890 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 13158 for container-id container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 7.6 GB of 42.5 GB virtual memory used It's rather strange that it mentions 42.5 GB of virtual memory. The machines are EMR machines with 32 GB of physical memory and, as far as I can determine, no swap space. The messages bracketing the stall are shown below. Any advice is welcome. Thanks! Sincerely, Mike Albert Before the stall. 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set() 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 6) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: List(Stage 7) At this point, I see no activity for 1.5 hours except for this (XXX for I.P. address) 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor: akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor Then finally it started again: 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4) 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
Re: How many stages in my application?
You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: ephemeral-hdfs vs persistent-hdfs - performance
Joe, I also use S3 and gzip. So far the I/O is not a problem. In my case, the operation is SQLContext.JsonFile() and I can see from Ganglia that the whole cluster is CPU bound (99% saturated). I have 160 cores and I can see the network can sustain about 150MBit/s. Kelvin On Wed, Feb 4, 2015 at 10:20 AM, Aaron Davidson ilike...@gmail.com wrote: The latter would be faster. With S3, you want to maximize number of concurrent readers until you hit your network throughput limits. On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi if i have a 10GB file on s3 and set 10 partitions, would it be download whole file on master first and broadcast it or each worker would just read it's range from the file? Thanks, Peter On 2015-02-03 23:30, Sven Krasser wrote: Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so the usual limitations apply. To optimize throughput, you can use EBS volumes with provisioned IOPS and you can use EBS optimized instances. I don't have hard numbers at hand, but I'd expect this to be noticeably slower than using local SSDs. As far as only using S3 goes, it depends on your use case (i.e. what you plan on doing with the data while it is there). If you store it there in between running different applications, you can likely work around consistency issues. Also, if you use Amazon's EMRFS to access data in S3, you can use their new consistency feature ( https://aws.amazon.com/blogs/aws/emr-consistent-file-system/). Hope this helps! -Sven On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
Re: How many stages in my application?
But there isn't a 1-1 mapping from operations to stages since multiple operations will be pipelined into a single stage if no shuffle is required. To determine the number of stages in a job you really need to be looking for shuffle boundaries. On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: Spark streaming app shutting down
AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the inflight data would be lost if you arent using any of the fault tolerance mechanism. Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Sprakans, I'm running a spark streaming app which reads data from kafka topic does some processing and then persists the results in HBase. I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 cores each). I've enable checkpointing I am also rate limiting my kafkaReceivers so that the number of items read is not more than 10 records per sec. The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. This app was running fine for ~3 days then there was an increased load on the HBase server because of some other process querying HBase tables. This led to increase in the batch processing time of the spark batches (processed 1 min batch in 10 min) which previously was finishing in 20 sec which in turn led to the shutdown of the spark application, PFA the executor logs. From the logs I'm getting below exceptions *[1]* *[2]* looks like there was some outstanding Jobs that didn't get processed or the Job couldn't find the input data. From the logs it looks seems that the shutdown hook gets invoked but it cannot process the in-flight block. I have a couple of queries on this 1) Does this mean that these jobs failed and the *in-flight data *is lost? 2) Does the Spark job *buffers kafka* input data while the Job is under processing state for 10 mins and on shutdown is that too lost? (I do not see any OOM error in the logs). 3) Can we have *explicit commits* enabled in the kafkaReceiver so that the offsets gets committed only when the RDD(s) get successfully processed? Also I'd like to know if there is a *graceful way to shutdown a spark app running on yarn*. Currently I'm killing the yarn app to stop it which leads to loss of that job's history wheras in this case the application stops and succeeds and thus preserves the logs history. *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed *[2]* java.lang.Exception: Could not compute split, block input-2-1422901498800 not found *[3]* org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 does not have any open files. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Greetings! Thanks to all who have taken the time to look at this. While the process is stalled, I see, in the yarn log on the head node, repeating messages of the form Trying to fulfill reservation for application XXX on node YYY, but that node is is reserved by XXX_01. Below is a chunk of the log. Any suggestions as to what I can investigate? Thanks!-Mike 2015-02-04 18:18:28,617 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Trying to fulfill reservation for application application_1422834185427_0088 on node: ip-10-171-0-124.ec2.internal:91032015-02-04 18:18:28,617 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): assignContainers: node=ip-10-171-0-124.ec2.internal application=88 priority=1 request={Priority: 1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: *, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,617 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt (ResourceManager Event Processor): Application application_1422834185427_0088 reserved container container_1422834185427_0088_01_25 on node host: ip-10-171-0-124.ec2.internal:9103 #containers=2 available=5632 used=17408, currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,618 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): Reserved container application=application_1422834185427_0088 resource=memory:8704, vCores:1 queue=default: capacity=1.0, absoluteCapacity=1.0, usedResources=memory:226304, vCores:26usedCapacity=0.982, absoluteUsedCapacity=0.982, numApps=1, numContainers=26 usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,618 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Skipping scheduling since node ip-10-171-0-124.ec2.internal:9103 is reserved by application appattempt_1422834185427_0088_012015-02-04 18:18:28,623 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): default usedResources: memory:226304, vCores:26 clusterResources: memory:230400, vCores:160 currentCapacity 0.982 required memory:8704, vCores:1 potentialNewCapacity: 1.02 ( max-capacity: 1.0)2015-02-04 18:18:28,625 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Trying to fulfill reservation for application application_1422834185427_0088 on node: ip-10-171-0-119.ec2.internal:91032015-02-04 18:18:28,625 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): assignContainers: node=ip-10-171-0-119.ec2.internal application=88 priority=1 request={Priority: 1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: *, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,626 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt (ResourceManager Event Processor): Application application_1422834185427_0088 reserved container container_1422834185427_0088_01_26 on node host: ip-10-171-0-119.ec2.internal:9103 #containers=2 available=5632 used=17408, currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,626 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): Reserved container application=application_1422834185427_0088 resource=memory:8704, vCores:1 queue=default: capacity=1.0, absoluteCapacity=1.0, usedResources=memory:226304, vCores:26usedCapacity=0.982, absoluteUsedCapacity=0.982, numApps=1, numContainers=26 usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,626 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Skipping scheduling since node ip-10-171-0-119.ec2.internal:9103 is reserved by application appattempt_1422834185427_0088_012015-02-04 18:18:28,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Trying to fulfill reservation for application application_1422834185427_0088 on node: ip-10-171-0-129.ec2.internal:91032015-02-04 18:18:28,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): assignContainers: node=ip-10-171-0-129.ec2.internal application=88 priority=1 request={Priority: 1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: *, Relax Locality: true}
Re: Parquet compression codecs not applied
I was using hive context an not sql context, therefore (SET spark.sql.parquet.compression.codec=gzip) was ignored. Michael Armbrust pointed out that parquet.compression should be used instead, witch solved the issue. I am still wondering if this behavior is normal, it would be better if spark.sql.parquet.compression.codec would be translated to parquet.compression in case of hive context. Other wise the documentation should be updated to be more precise. 2015-02-04 19:13 GMT+01:00 sahanbull sa...@skimlinks.com: Hi Ayoub, You could try using the sql format to set the compression type: sc = SparkContext() sqc = SQLContext(sc) sqc.sql(SET spark.sql.parquet.compression.codec=gzip) You get a notification on screen while running the spark job when you set the compression codec like this. I havent compared it with different compression methods, Please let the mailing list knows if this works for you. Best Sahan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Parquet-compression-codecs-not-applied-tp21499.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Parquet compression codecs not applied
Hi Ayoub, You could try using the sql format to set the compression type: sc = SparkContext() sqc = SQLContext(sc) sqc.sql(SET spark.sql.parquet.compression.codec=gzip) You get a notification on screen while running the spark job when you set the compression codec like this. I havent compared it with different compression methods, Please let the mailing list knows if this works for you. Best Sahan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: “mapreduce.job.user.classpath.first” for Spark
Hi Corey, When you run on Yarn, Yarn's libraries are placed in the classpath, and they have precedence over your app's. So, with Spark 1.2, you'll get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get Guava 14 from Spark, so still a problem for you). Right now, the option Markus mentioned (spark.yarn.user.classpath.first) can be a workaround for you, since it will place your app's jars before Yarn's on the classpath. On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors? -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ephemeral-hdfs vs persistent-hdfs - performance
The latter would be faster. With S3, you want to maximize number of concurrent readers until you hit your network throughput limits. On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi if i have a 10GB file on s3 and set 10 partitions, would it be download whole file on master first and broadcast it or each worker would just read it's range from the file? Thanks, Peter On 2015-02-03 23:30, Sven Krasser wrote: Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so the usual limitations apply. To optimize throughput, you can use EBS volumes with provisioned IOPS and you can use EBS optimized instances. I don't have hard numbers at hand, but I'd expect this to be noticeably slower than using local SSDs. As far as only using S3 goes, it depends on your use case (i.e. what you plan on doing with the data while it is there). If you store it there in between running different applications, you can likely work around consistency issues. Also, if you use Amazon's EMRFS to access data in S3, you can use their new consistency feature ( https://aws.amazon.com/blogs/aws/emr-consistent-file-system/). Hope this helps! -Sven On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
Re: “mapreduce.job.user.classpath.first” for Spark
the whole spark.files.userClassPathFirs never really worked for me in standalone mode, since jars were added dynamically which means they had different classloaders leading to a real classloader hell if you tried to add a newer version of jar that spark already used. see: https://issues.apache.org/jira/browse/SPARK-1863 do i understand it correctly that on yarn the the customer jars are truly placed before the yarn and spark jars on classpath? meaning at container construction time, on the same classloader? that would be great news for me. it would open up the possibility of using newer versions of many libraries. On Wed, Feb 4, 2015 at 1:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Corey, When you run on Yarn, Yarn's libraries are placed in the classpath, and they have precedence over your app's. So, with Spark 1.2, you'll get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get Guava 14 from Spark, so still a problem for you). Right now, the option Markus mentioned (spark.yarn.user.classpath.first) can be a workaround for you, since it will place your app's jars before Yarn's on the classpath. On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors? -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: “mapreduce.job.user.classpath.first” for Spark
My mistake Marcello, I was looking at the wrong message. That reply was meant for bo yang. On Feb 4, 2015 4:02 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Corey, On Wed, Feb 4, 2015 at 12:44 PM, Corey Nolet cjno...@gmail.com wrote: Another suggestion is to build Spark by yourself. I'm having trouble seeing what you mean here, Marcelo. Guava is already shaded to a different package for the 1.2.0 release. It shouldn't be causing conflicts. That wasn't my suggestion and I definitely do not recommend rebuilding Spark to work around these issues. :-) -- Marcelo
Re: How to get Hive table schema using Spark SQL or otherwise
sqlContext.table(tableName).schema() On Wed, Feb 4, 2015 at 1:07 PM, Ayoub benali.ayoub.i...@gmail.com wrote: Given a hive context you could execute: hiveContext.sql(describe TABLE_NAME) you would get the name of the fields and their types 2015-02-04 21:47 GMT+01:00 nitinkak001 [hidden email] http:///user/SendEmail.jtp?type=nodenode=21502i=0: I want to get a Hive table schema details into Spark. Specifically, I want to get column name and type information. Is it possible to do it e.g using JavaSchemaRDD or something else? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=21502i=1 For additional commands, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=21502i=2 -- View this message in context: Re: How to get Hive table schema using Spark SQL or otherwise http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Large # of tasks in groupby on single table
Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: “mapreduce.job.user.classpath.first” for Spark
marcelo, i was not aware of those fixes. its a fulltime job to keep up with spark... i will take another look. it would be great if that works on spark standalone also and resolves the issues i experienced before. about putting stuff on classpath before spark or yarn... yeah you can shoot yourself in the foot with it, but since the container is isolated it should be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with great success. with the ability to put our own classes first and support for security yarn now seems more attractive than standalone to me for many applications/situations. never thought i would say that. best On Wed, Feb 4, 2015 at 4:01 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Koert, On Wed, Feb 4, 2015 at 11:35 AM, Koert Kuipers ko...@tresata.com wrote: do i understand it correctly that on yarn the the customer jars are truly placed before the yarn and spark jars on classpath? meaning at container construction time, on the same classloader? that would be great news for me. it would open up the possibility of using newer versions of many libraries. That's correct, the Yarn setting places the user's jars in the system classpath before Spark/Hadoop jars, so they can override classes needed by Spark/Hadoop. That's the main reason why it's not documented and not suggested unless there's no other workaround. Because you're potentially overriding classes that might break Spark, Hadoop or something else that's packaged with those. But if it works for your case, that's great. As for the userClassPath first thing, I've made some changes to the class loaders as part of implementing that option for Yarn [1], and someone also made similar changes in isolation [2]. So maybe the issues you were running into are fixed by either of those? In the future, it would be great to be able to declare that feature stable, since I believe it's a better alternative to overriding libraries that Spark or Hadoop depend on. [1] https://github.com/apache/spark/pull/3233 [2] https://github.com/apache/spark/pull/3725 -- Marcelo
Re: “mapreduce.job.user.classpath.first” for Spark
Hi Koert, On Wed, Feb 4, 2015 at 11:35 AM, Koert Kuipers ko...@tresata.com wrote: do i understand it correctly that on yarn the the customer jars are truly placed before the yarn and spark jars on classpath? meaning at container construction time, on the same classloader? that would be great news for me. it would open up the possibility of using newer versions of many libraries. That's correct, the Yarn setting places the user's jars in the system classpath before Spark/Hadoop jars, so they can override classes needed by Spark/Hadoop. That's the main reason why it's not documented and not suggested unless there's no other workaround. Because you're potentially overriding classes that might break Spark, Hadoop or something else that's packaged with those. But if it works for your case, that's great. As for the userClassPath first thing, I've made some changes to the class loaders as part of implementing that option for Yarn [1], and someone also made similar changes in isolation [2]. So maybe the issues you were running into are fixed by either of those? In the future, it would be great to be able to declare that feature stable, since I believe it's a better alternative to overriding libraries that Spark or Hadoop depend on. [1] https://github.com/apache/spark/pull/3233 [2] https://github.com/apache/spark/pull/3725 -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why are task results large in this case?
I am running a job, part of which is to add some null values to the rows of a SchemaRDD. The job fails with Total size of serialized results of 2692 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize(1024.0 MB) This is the code: val in = sqc.parquetFile(...) .. val presentColProj: SchemaRDD = in.select(symbolList : _*) val nullSeq:Broadcast[Seq[_]] = sc.broadcast(Seq.fill(missingColNames.size)(null)) val nullPaddedProj: RDD[Row] = presentColProj.map { row = Row.fromSeq( Row.unapplySeq(row).get ++ nullSeq.value) } .. sqc.applySchema(nullPaddedProj, newSchema) I believe it is failing on the map. Is the size of the serialized result large because of the rows in the map? Is there a better way to add some null columns to a schemardd? Any insight would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-are-task-results-large-in-this-case-tp21503.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to get Hive table schema using Spark SQL or otherwise
Given a hive context you could execute: hiveContext.sql(describe TABLE_NAME) you would get the name of the fields and their types 2015-02-04 21:47 GMT+01:00 nitinkak001 nitinkak...@gmail.com: I want to get a Hive table schema details into Spark. Specifically, I want to get column name and type information. Is it possible to do it e.g using JavaSchemaRDD or something else? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: “mapreduce.job.user.classpath.first” for Spark
On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote: about putting stuff on classpath before spark or yarn... yeah you can shoot yourself in the foot with it, but since the container is isolated it should be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with great success. The container still has to use Hadoop libraries, e.g., to talk to HDFS. If you override a library it needs with an incompatible one, you may break something. So maybe you've just been lucky. :-) In reality it should be pretty hard to cause breakages if you're careful - e.g., when you override a jar of some library that generally has multiple jars, such as Jackson, you need to include all of them, not just the one(s) you need in your app. MR also has an option similar to Spark's userClassPath (see https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't involve messing with the system's class path. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming - tracking/deleting processed files
Thank you very much for the detailed answer. I feel a little dumb asking but how would that work when using Scala (we use Spark 1.0.2)? I can not figure it out. E.g. I am having trouble using UnionPartition and NewHadoopPartition or even ds.values() is not an option for me (in the IDE). Do you have any Scala code that does something similar? Any help is appreciated. BTW: I am creating the dStream like this: val ds = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), f, true).map(_._2.toString) Thanks, Markus On Tue, Feb 3, 2015 at 4:55 AM, Prannoy [via Apache Spark User List] ml-node+s1001560n21478...@n3.nabble.com wrote: Hi, To keep processing the older file also you can use fileStream instead of textFileStream. It has a parameter to specify to look for already present files. For deleting the processed files one way is to get the list of all files in the dStream. This can be done by using the foreachRDD api of the dStream received from the fileStream(or textFileStream). Suppose the dStream is JavaDStreamString jpDstream = ssc .textFileStream(path/to/your/folder/); jpDstream.print(); jpDstream.foreachRDD( new FunctionJavaRDDString, Void(){ @Override public Void call(JavaRDDString arg0) throws Exception { getContentHigh(arg0,ssc); return null; } } ); public static U void getContentHigh(JavaRDDString ds, JavaStreamingContext ssc){ int lenPartition = ds.rdd().partitions().length; // this gives the number of files the stream picked for(int i=0;ilenPartition;i++) { UnionPartition upp = (UnionPartition) listPartitions[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition(); String fPath = npp.serializableHadoopSplit().value().toString(); String[] nT = tmpName.split(:); String name = nT[0]; // name is the path of the file picked for processing. the processing logic can be inside this loop. once //done you can delete the file using the path in the variable name } } Thanks. On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21478i=0 wrote: We are running a Spark streaming job that retrieves files from a directory (using textFileStream). One concern we are having is the case where the job is down but files are still being added to the directory. Once the job starts up again, those files are not being picked up (since they are not new or changed while the job is running) but we would like them to be processed. Is there a solution for that? Is there a way to keep track what files have been processed and can we force older files to be picked up? Is there a way to delete the processed files? Thanks! Markus -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html To start a new topic under Apache Spark User List, email [hidden email] http:///user/SendEmail.jtp?type=nodenode=21478i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html To unsubscribe from Spark streaming - tracking/deleting processed files, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21444code=Z2FudGVybUBnbWFpbC5jb218MjE0NDR8LTE4MTQ3NTI4NTM= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21504.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: “mapreduce.job.user.classpath.first” for Spark
i have never been a big fan of shading, since it leads to the same library being packaged many times. for example, its not unusual to have ASM 10 times in a jar because of the shading policy they promote. and all that because they broke one signature and without a really good reason. i am a big fan of maintaining backwards compatibility and when you dont change the maven artifact and java package, like jackson did when they went from 1 to 2. now that makes sense :) you can even have jackson 1 and 2 together within the same project without any issues. On Wed, Feb 4, 2015 at 3:44 PM, Corey Nolet cjno...@gmail.com wrote: leading to a real classloader hell if you tried to add a newer version of jar that spark already used. Spark at least makes this easier on the Guava side [1] by shading the package names internally so that there's no possibility of a conflict. Elasticsearch Storm do this too for many of their dependencies and I think it's a great practice for libraries that are only used internally- specifically when those internal libraries are not exposed at all to the outside. If you are only using said libraries internally, that strategy may work for you as well, Koert. I'm going to ask about this on the Hadoop list as well to see if maybe there was a decision against it for reasons I haven't thought of. Another suggestion is to build Spark by yourself. I'm having trouble seeing what you mean here, Marcelo. Guava is already shaded to a different package for the 1.2.0 release. It shouldn't be causing conflicts. [1] https://issues.apache.org/jira/browse/SPARK-2848 On Wed, Feb 4, 2015 at 2:35 PM, Koert Kuipers ko...@tresata.com wrote: the whole spark.files.userClassPathFirs never really worked for me in standalone mode, since jars were added dynamically which means they had different classloaders leading to a real classloader hell if you tried to add a newer version of jar that spark already used. see: https://issues.apache.org/jira/browse/SPARK-1863 do i understand it correctly that on yarn the the customer jars are truly placed before the yarn and spark jars on classpath? meaning at container construction time, on the same classloader? that would be great news for me. it would open up the possibility of using newer versions of many libraries. On Wed, Feb 4, 2015 at 1:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Corey, When you run on Yarn, Yarn's libraries are placed in the classpath, and they have precedence over your app's. So, with Spark 1.2, you'll get Guava 11 in your classpath (with Spark 1.1 and earlier you'd get Guava 14 from Spark, so still a problem for you). Right now, the option Markus mentioned (spark.yarn.user.classpath.first) can be a workaround for you, since it will place your app's jars before Yarn's on the classpath. On Tue, Feb 3, 2015 at 8:20 PM, Corey Nolet cjno...@gmail.com wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors? -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: “mapreduce.job.user.classpath.first” for Spark
Hi Corey, On Wed, Feb 4, 2015 at 12:44 PM, Corey Nolet cjno...@gmail.com wrote: Another suggestion is to build Spark by yourself. I'm having trouble seeing what you mean here, Marcelo. Guava is already shaded to a different package for the 1.2.0 release. It shouldn't be causing conflicts. That wasn't my suggestion and I definitely do not recommend rebuilding Spark to work around these issues. :-) -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark with cdh 5.2.1
yep...it was unnecessary to create a 2.5 profile. i struggled a bit because it wasn't clear that i *need* to select a profile using -P option. i didn't have to do that for earlier hadoop versions. On Fri, Jan 30, 2015 at 12:11 AM, Sean Owen so...@cloudera.com wrote: There is no need for a 2.5 profile. The hadoop-2.4 profile is for Hadoop 2.4 and beyond. You can set the particular version you want with -Dhadoop.version= You do not need to make any new profile to compile vs 2.5.0-cdh5.2.1. Again, the hadoop-2.4 profile is what you need. On Thu, Jan 29, 2015 at 11:33 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi All, I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone tried Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be sufficient to make this work? Mohit. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL taking long time to print records from a table
Many operations in spark are lazy -- most likely your collect() statement is actually forcing evaluation of severals steps earlier in the pipeline. The logs the UI might give you some info about all the stages that are being run when you get to collect(). I think collect() is just fine if you are trying to pull just one record to the driver, that shouldn't be a bottleneck. On Wed, Feb 4, 2015 at 1:32 AM, jguliani jasminkguli...@gmail.com wrote: I have 3 text files in hdfs which I am reading using spark sql and registering them as table. After that I am doing almost 5-6 operations - including joins , group by etc.. And this whole process is taking hardly 6-7 secs. ( Source File size - 3 GB with almost 20 million rows ). As a final step of my computation, I am expecting only 1 record in my final rdd - named as acctNPIScr in below code snippet. My question here is that when I am trying to print this rdd either by registering as table and printing records from table or by this method - acctNPIScr.map(t = Score: + t(1)).collect().foreach(println). It is taking very long time - almost 1.5 minute to print 1 record. Can someone pls help me if I am doing something wrong in printing. What is the best way to print final result from schemardd. . val acctNPIScr = sqlContext.sql(SELECT party_id, sum(npi_int)/sum(device_priority_new) as npi_score FROM AcctNPIScoreTemp group by party_id ) acctNPIScr.registerTempTable(AcctNPIScore) val endtime = System.currentTimeMillis() logger.info(Total sql Time : + (endtime - st)) // this time is hardly 5 secs println(start printing) val result = sqlContext.sql(SELECT * FROM AcctNPIScore).collect().foreach(println) //acctNPIScr.map(t = Score: + t(1)).collect().foreach(println) logger.info(Total printing Time : + (System.currentTimeMillis() - endtime)) // print one record is taking almost 1.5 minute -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Tableau beta connector
Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
New combination-like RDD based on two RDDs
Hey Spark gurus! Sorry for the confusing title. I do not know the exactly description of my problem, if you know please tell me so I can change it :-) Say I have two RDDs right now, and they are val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6 val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9 I want combine rdd1 and rdd2 to get rdd3 which looks like List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9))) The order in _._2 does not matter, so you can treat it as a Set. I tried to use zip, but since there is no guarantee that the length of rdd1 and rdd2 will be the same I do not know if it is doable. Also I looked into PairedRDD, some people use union operation on two RDDs and then apply a map function on it. Since I want all combinations according to _._1, I do not know how to achieve it by union and map. Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Large # of tasks in groupby on single table
Follow up for closure on thread ... 1. spark.sql.shuffle.partitions is not on config page but is mentioned on http://spark.apache.org/docs/1.2.0/sql-programming-guide.html. Would be better to have it in config page as well for sake of completeness. Should I file a doc bug ? 2. Regarding my #2 above (Spark should auto-determining # of tasks), there is already a write up on SQL Programming page in Hive optimizations not in Spark Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”. Any idea if and when this is scheduled ? Even a rudimentary implementation (e.g. based on # of partitions of underlying RDD, which is available now) would be a improvement over current fixed 200 and would be a critical feature for SparkSQL feasibility Thanks On Wed, Feb 4, 2015 at 4:09 PM, Manoj Samel manojsamelt...@gmail.com wrote: Awesome ! By setting this, I could minimize the collect overhead, e.g by setting it to # of partitions of the RDD. Two questions 1. I had looked for such option in http://spark.apache.org/docs/latest/configuration.html but this is not documented. Seems this a doc. bug ? 2. Ideally the shuffle partitions should be derive from underlying table(s) and a optimal number should be set for each query. Having one number across all queries is not ideal, nor do the consumer wants to set it before each query to different #. Any thoughts ? Thanks ! On Wed, Feb 4, 2015 at 3:50 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Manoj, You can set the number of partitions you want your sql query to use. By default it is 200 and thus you see that number. You can update it using the spark.sql.shuffle.partitions property spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations. Thanks Ankur On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Any thoughts on this? Most of the 200 tasks in collect phase take less than 20 ms and lot of time is spent on scheduling these. I suspect the overall time will reduce if # of tasks are dropped to a much smaller # (close to # of partitions?) Any help is appreciated Thanks On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
synchronously submitting spark jobs
Hi, I would like to submit spark jobs one by one, in that the next job will not be submitted until the previous one succeeds. Spark_submit can only submit jobs asynchronously. Is there any way I can submit jobs sequentially? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/synchronously-submitting-spark-jobs-tp21507.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Large # of tasks in groupby on single table
Hi Manoj, You can set the number of partitions you want your sql query to use. By default it is 200 and thus you see that number. You can update it using the spark.sql.shuffle.partitions property spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations. Thanks Ankur On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Any thoughts on this? Most of the 200 tasks in collect phase take less than 20 ms and lot of time is spent on scheduling these. I suspect the overall time will reduce if # of tasks are dropped to a much smaller # (close to # of partitions?) Any help is appreciated Thanks On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: “mapreduce.job.user.classpath.first” for Spark
anyhow i am ranting... sorry On Wed, Feb 4, 2015 at 5:54 PM, Koert Kuipers ko...@tresata.com wrote: yeah i think we have been lucky so far. but i dont really see how i have a choice. it would be fine if say hadoop exposes a very small set of libraries as part of the classpath. but if i look at the jars on hadoop classpath its a ton! and why? why does parquet need to be included with hadoop for example? or avro? it just makes my life harder. and i dont really see who benefits. the yarn classpath is insane too. On Wed, Feb 4, 2015 at 4:26 PM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote: about putting stuff on classpath before spark or yarn... yeah you can shoot yourself in the foot with it, but since the container is isolated it should be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with great success. The container still has to use Hadoop libraries, e.g., to talk to HDFS. If you override a library it needs with an incompatible one, you may break something. So maybe you've just been lucky. :-) In reality it should be pretty hard to cause breakages if you're careful - e.g., when you override a jar of some library that generally has multiple jars, such as Jackson, you need to include all of them, not just the one(s) you need in your app. MR also has an option similar to Spark's userClassPath (see https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't involve messing with the system's class path. -- Marcelo
Re: Large # of tasks in groupby on single table
Hi, Any thoughts on this? Most of the 200 tasks in collect phase take less than 20 ms and lot of time is spent on scheduling these. I suspect the overall time will reduce if # of tasks are dropped to a much smaller # (close to # of partitions?) Any help is appreciated Thanks On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: Problem with changing the akka.framesize parameter
The unit of spark.akka.frameSize is MB. The max value is 2047. Best Regards, Shixiong Zhu 2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com: I am trying to run a spark application with -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000 -Dspark.akka.frameSize=1 and the job fails because one or more of the akka frames are larger than 1mb (12000 ish). When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and RUN: ./spark/bin/spark-submit --driver-memory 30g --executor-memory 30g mySparkCode.py I get an error in the startup as : ERROR OneForOneStrategy: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and ha s constructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters java.lang.IllegalArgumentException: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and has const ructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620) at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: requirement failed: Setting 'maximum-frame-size' must be at least 32000 bytes at scala.Predef$.require(Predef.scala:233) at akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at
Re: Problem with changing the akka.framesize parameter
Could you clarify why you need a 10G akka frame size? Best Regards, Shixiong Zhu 2015-02-05 9:20 GMT+08:00 Shixiong Zhu zsxw...@gmail.com: The unit of spark.akka.frameSize is MB. The max value is 2047. Best Regards, Shixiong Zhu 2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com: I am trying to run a spark application with -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000 -Dspark.akka.frameSize=1 and the job fails because one or more of the akka frames are larger than 1mb (12000 ish). When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and RUN: ./spark/bin/spark-submit --driver-memory 30g --executor-memory 30g mySparkCode.py I get an error in the startup as : ERROR OneForOneStrategy: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and ha s constructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters java.lang.IllegalArgumentException: Cannot instantiate transport [akka.remote.transport.netty.NettyTransport]. Make sure it extends [akka.remote.transport.Transport] and has const ructor with [akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620) at akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618) at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: requirement failed: Setting 'maximum-frame-size' must be at least 32000 bytes at scala.Predef$.require(Predef.scala:233) at akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
Is there a way to access Hive UDFs in a HiveContext?
I'm trying to access a permanent Hive UDF, scala val data = hc.sql(select func.md5(some_string) from some_table) data: org.apache.spark.sql.SchemaRDD = SchemaRDD[19] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == java.lang.RuntimeException: Couldn't find function func.md5 Version: Spark 1.2 on CDH 5.3. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-access-Hive-UDFs-in-a-HiveContext-tp21510.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New combination-like RDD based on two RDDs
Problem solved. A simple join will do the work val prefix = new PairRDDFunctions[Int, Set[Int]](sc.parallelize(List((9, Set(4)), (1,Set(3)), (2,Set(5)), (2,Set(4) val suffix = sc.parallelize(List((1, Set(1)), (2, Set(6)), (2, Set(5)), (2, Set(7 prefix.join(suffix).collect().foreach(println) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508p21511.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark - gzip output compression
How to save RDD with gzip compression? Thanks.
Re: Tableau beta connector
Thanks Denny and Ismail. Denny ,I went through your blog, It was great help. I guess tableau beta connector also following the same procedure,you described in blog. I am building the Spark now. Basically what I don't get is, where to put my data so that tableau can extract. So Ismail,its just Spark SQL. No RDDs I think I am getting it now . We use spark for our big data processing and we want processed data (Rdd) into tableau. So we should put our data in hive metastore and tableau will extract it from there using this connector? Correct me if I am wrong. I guess I have to look at how thrift server works. From: Denny Lee denny.g@gmail.com Sent: Thursday, February 5, 2015 12:20 PM To: İsmail Keskin; Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.org Subject: Re: Tableau beta connector Some quick context behind how Tableau interacts with Spark / Hive can also be found at https://www.concur.com/blog/en-us/connect-tableau-to-sparksql - its for how to connect from Tableau to the thrift server before the official Tableau beta connector but should provide some of the additional context called out. HTH! On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin ismail.kes...@dilisim.commailto:ismail.kes...@dilisim.com wrote: Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD stuff applies, you just issue SQL queries from Tableau. The table metadata can come from Hive Metastore if you place your hive-site.xml to configuration directory of Spark. On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Tableau beta connector
Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD stuff applies, you just issue SQL queries from Tableau. The table metadata can come from Hive Metastore if you place your hive-site.xml to configuration directory of Spark. On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.org wrote: Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job running on localhost on yarn cluster
Is YARN_CONF_DIR set? --- Original Message --- From: Aniket Bhatnagar aniket.bhatna...@gmail.com Sent: February 4, 2015 6:16 AM To: kundan kumar iitr.kun...@gmail.com, spark users user@spark.apache.org Subject: Re: Spark Job running on localhost on yarn cluster Have you set master in SparkConf/SparkContext in your code? Driver logs show in which mode the spark job is running. Double check if the logs mention local or yarn-cluster. Also, what's the error that you are getting? On Wed, Feb 4, 2015, 6:13 PM kundan kumar iitr.kun...@gmail.com wrote: Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards, Kundan
Re: Tableau beta connector
The context is that you would create your RDDs and then persist them in Hive. Once in Hive, the data is accessible from the Tableau extract through Spark thrift server. On Wed, Feb 4, 2015 at 23:36 Ashutosh Trivedi (MT2013030) ashutosh.triv...@iiitb.org wrote: Thanks Denny and Ismail. Denny ,I went through your blog, It was great help. I guess tableau beta connector also following the same procedure,you described in blog. I am building the Spark now. Basically what I don't get is, where to put my data so that tableau can extract. So Ismail,its just Spark SQL. No RDDs I think I am getting it now . We use spark for our big data processing and we want *processed data (Rdd)* into tableau. So we should put our data in hive metastore and tableau will extract it from there using this connector? Correct me if I am wrong. I guess I have to look at how thrift server works. -- *From:* Denny Lee denny.g@gmail.com *Sent:* Thursday, February 5, 2015 12:20 PM *To:* İsmail Keskin; Ashutosh Trivedi (MT2013030) *Cc:* user@spark.apache.org *Subject:* Re: Tableau beta connector Some quick context behind how Tableau interacts with Spark / Hive can also be found at https://www.concur.com/blog/en-us/connect-tableau-to-sparksql - its for how to connect from Tableau to the thrift server before the official Tableau beta connector but should provide some of the additional context called out. HTH! On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin ismail.kes...@dilisim.com wrote: Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD stuff applies, you just issue SQL queries from Tableau. The table metadata can come from Hive Metastore if you place your hive-site.xml to configuration directory of Spark. On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.org wrote: Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Whether standalone spark support kerberos?
Hope someone helps me. Thanks. On Wed, Feb 4, 2015 at 6:14 PM, Jander g jande...@gmail.com wrote: We have a standalone spark cluster for kerberos test. But when reading from hdfs, i get error output: Can't get Master Kerberos principal for use as renewer. So Whether standalone spark support kerberos? can anyone confirm it? or what i missed? Thanks in advance. -- Thanks, Jander -- Thanks, Jander
How many stages in my application?
I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Joining piped RDDs
Hi! We have a common usecase with Spark - we go out to some database, e.g. Cassandra, crunch though all of its data, but along the RDD pipeline we use a pipe operator to some script. All the data before the pipe has some unique IDs, but inside the pipe everything is lost. The only current solution we have is to format the data into the pipe, so it includes the ids, and then restore it all in a map after the pipe. However it would be much nicer if we could just join/zip back the output of the pipe. However we can’t cache the RDDs, so it would be nice to have a forkRDD of some sort that only keeps the last partition in cache (since we’re guaranteed that there’ll be a zip later on and the dataflow will be synchronized). Or maybe we can already do this in Spark? Thank you, Pavel Velikhov Chief Science Officer TopRater - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception in thread main java.lang.SecurityException: class javax.servlet.ServletRegistration'
It means you did not exclude the Servlet APIs from some dependency in your app, and one of them is bringing it in every time. Look at the dependency tree and exclude whatever brings in javax.servlet. It should be already available in Spark, and the particular javax.servlet JAR from Oracle has signing info that you have to strip out, or simply exclude the whole thing. On Wed, Feb 4, 2015 at 1:20 AM, DEVAN M.S. msdeva...@gmail.com wrote: HI all, I need a help. When i am trying to run spark project it is showing that, Exception in thread main java.lang.SecurityException: class javax.servlet.ServletRegistration's signer information does not match signer information of other classes in the same package. After deleting /home/devan/.ivy2/cache/javax.servlet this folder the things are working... Don't know what happened.. Please help, Because on each restart the same folder is coming there. Found this: http://stackoverflow.com/questions/2877262/java-securityexception-signer-information-does-not-match which one makes the conflict ?? These are the libraries using, libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0 libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.1.0 libraryDependencies += com.googlecode.json-simple % json-simple % 1.1.1 libraryDependencies += org.apache.spark % spark-hive_2.10 % 1.1.0 libraryDependencies += org.apache.spark % spark-graphx_2.10 % 1.1.0 libraryDependencies += org.apache.spark % spark-mllib_2.10 % 1.1.0 libraryDependencies +=com.google.code.gson % gson % 2.3.1 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark % spark-streaming-mqtt_2.10 % 1.1.0 libraryDependencies += org.tachyonproject % tachyon % 0.5.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Job running on localhost on yarn cluster
Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards, Kundan
Re: “mapreduce.job.user.classpath.first” for Spark
Bo yang- I am using Spark 1.2.0 and undoubtedly there are older Guava classes which are being picked up and serialized with the closures when they are sent from the driver to the executors because the class serial version ids don't match from the driver to the executors. Have you tried doing this? Guava works fine for me when this is not the case- but as soon as a Guava class which was changed from versions 15.0 is serialized, it fails. See [1] fore info- we did fairly extensive testing last night. I've isolated the issue to Hadoop's really old version of Guava being picked up. Again, this is only noticeable when classes are used from Guava 15.0 that were changed from previous versions and those classes are being serialized on the driver and shipped to the executors. [1] https://github.com/calrissian/mango/issues/158 On Wed, Feb 4, 2015 at 1:31 AM, bo yang bobyan...@gmail.com wrote: Corey, Which version of Spark do you use? I am using Spark 1.2.0, and guava 15.0. It seems fine. Best, Bo On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid wrote: Try spark.yarn.user.classpath.first (see https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN). Also thread at http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html . HTH, Markus On 02/03/2015 11:20 PM, Corey Nolet wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors?
Re: Multiple running SparkContexts detected in the same JVM!
It means what it says. You should not have multiple SparkContexts running in one JVM. It was always the wrong thing to do, but now is an explicit error. When you run spark-shell, you already have a SparkContext (sc) so there is no need to make another one. Just don't do that. On Wed, Feb 4, 2015 at 12:20 AM, gavin zhang gavin@gmail.com wrote: I have a cluster which running CDH5.1.0 with Spark component. Because the default version of Spark from CDH5.1.0 is 1.0.0 while I want to use some features of Spark 1.2.0, I compiled another Spark with Maven. But when I run into Spark-shell and created a new SparkContext, I met the below error: 15/02/04 14:08:19 WARN SparkContext: Multiple running SparkContexts detected in the same JVM! org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at ... And I tried to delete the default Spark and *set(spark.driver.allowMultipleContexts, true) * option, But It didn't work. How could I fix it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-running-SparkContexts-detected-in-the-same-JVM-tp21492.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problems with GC and time to execute with different number of executors.
I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Whether standalone spark support kerberos?
We have a standalone spark cluster for kerberos test. But when reading from hdfs, i get error output: Can't get Master Kerberos principal for use as renewer. So Whether standalone spark support kerberos? can anyone confirm it? or what i missed? Thanks in advance. -- Thanks, Jander
Re: 2GB limit for partitions?
Hi Mridul, do you think you'll keep working on this, or should this get picked up by others? Looks like there was a lot of work put into LargeByteBuffer, seems promising. thanks, Imran On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com wrote: That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition,
Re: Sort based shuffle not working properly?
I think you are interested in secondary sort, which is still being worked on: https://issues.apache.org/jira/browse/SPARK-3655 On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: “mapreduce.job.user.classpath.first” for Spark
yeah i think we have been lucky so far. but i dont really see how i have a choice. it would be fine if say hadoop exposes a very small set of libraries as part of the classpath. but if i look at the jars on hadoop classpath its a ton! and why? why does parquet need to be included with hadoop for example? or avro? it just makes my life harder. and i dont really see who benefits. the yarn classpath is insane too. On Wed, Feb 4, 2015 at 4:26 PM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Feb 4, 2015 at 1:12 PM, Koert Kuipers ko...@tresata.com wrote: about putting stuff on classpath before spark or yarn... yeah you can shoot yourself in the foot with it, but since the container is isolated it should be ok, no? we have been using HADOOP_USER_CLASSPATH_FIRST forever with great success. The container still has to use Hadoop libraries, e.g., to talk to HDFS. If you override a library it needs with an incompatible one, you may break something. So maybe you've just been lucky. :-) In reality it should be pretty hard to cause breakages if you're careful - e.g., when you override a jar of some library that generally has multiple jars, such as Jackson, you need to include all of them, not just the one(s) you need in your app. MR also has an option similar to Spark's userClassPath (see https://issues.apache.org/jira/browse/MAPREDUCE-1700), which doesn't involve messing with the system's class path. -- Marcelo
Re: Spark streaming app shutting down
Thanks Akhil for mentioning this Low Level Consumer ( https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better fault tolerant mechanism than any existing Kafka consumer available . This has no data loss on receiver failure and have ability to reply or restart itself in-case of failure. You can definitely give it a try . Dibyendu On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the inflight data would be lost if you arent using any of the fault tolerance mechanism. Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Sprakans, I'm running a spark streaming app which reads data from kafka topic does some processing and then persists the results in HBase. I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 cores each). I've enable checkpointing I am also rate limiting my kafkaReceivers so that the number of items read is not more than 10 records per sec. The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. This app was running fine for ~3 days then there was an increased load on the HBase server because of some other process querying HBase tables. This led to increase in the batch processing time of the spark batches (processed 1 min batch in 10 min) which previously was finishing in 20 sec which in turn led to the shutdown of the spark application, PFA the executor logs. From the logs I'm getting below exceptions *[1]* *[2]* looks like there was some outstanding Jobs that didn't get processed or the Job couldn't find the input data. From the logs it looks seems that the shutdown hook gets invoked but it cannot process the in-flight block. I have a couple of queries on this 1) Does this mean that these jobs failed and the *in-flight data *is lost? 2) Does the Spark job *buffers kafka* input data while the Job is under processing state for 10 mins and on shutdown is that too lost? (I do not see any OOM error in the logs). 3) Can we have *explicit commits* enabled in the kafkaReceiver so that the offsets gets committed only when the RDD(s) get successfully processed? Also I'd like to know if there is a *graceful way to shutdown a spark app running on yarn*. Currently I'm killing the yarn app to stop it which leads to loss of that job's history wheras in this case the application stops and succeeds and thus preserves the logs history. *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed *[2]* java.lang.Exception: Could not compute split, block input-2-1422901498800 not found *[3]* org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 does not have any open files. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org