Help connecting to the cluster
Hi Spark users, could someone help me out. My company has a fully functioning spark cluster with shark running on top of it (as part of the same cluster, on the same LAN) . I'm interested in running raw spark code against it but am running against the following issue -- it seems like the machine hosting the driver program needs to be reachable by the worker nodes (in my case the workers cannot route to the machine hosting the driver). Below is a snippet from my worker log: 14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler 14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down. Does this sound right -- it's not clear to me why a worker would try to establish a connection to the driver -- the driver already connected successfully as I see the program listed in the logwhy is this connection not sufficient? If you use Amazon EC2, can you run the driver from your personal machine or do have to install an IDE on one of Amazon machines in order to debug code? I am not too excited about the EC2 option as our data is proprietary...but if that's the shortest path to success at least it would get me started on some toy examples. At the moment I'm not sure what my options are, other than running a VM cluster or EC2 Any help/insight would be greatly appreciated.
Re: Spark stand alone cluster mode
does sbt show full-classpath show spark-core on the classpath? I am still pretty new to scala but it seems like you have val sparkCore = org.apache.spark %% spark-core% V.spark % provided -- I believe the provided part means it's in your classpath. Spark-shell script sets up a lot of stuff for you so... On Tue, Mar 11, 2014 at 9:02 AM, Gino Mathews gin...@thinkpalm.com wrote: Hi, I am new to spark. I would like to run jobs in Spark stand alone cluster mode. No cluser managers other than spark is used. (https://spark.apache.org/docs/0.9.0/spark-standalone.html) I have tried wordcount from spark shell and stand alone scala app. The code reads input from HDFS and writes the results to HDFS. uses 2 worker nodes. In spark-shell the wordcount is successful, how ever my effort to run stand alone programmes are in vain. My environement Ubuntu 12.04 - 32 bit JAVA 1.7.0_51 I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating installed hadoop 2.2.0 as separate hduser and given permission to other users. installed scala 2.10.3 installed sbt 0.13.1 Spark master act as HDFS master I have one master and 2 worker nodes and HDFS is accessible in all nodes. I downloaded example project and modified to use my spark cluster. I started the sparkcluster at spark://192.168.0.138:7077 and hdfs://master:9000/ When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get following error gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run [info] Loading project definition from /home/gino/Test/spark-example-project/project [info] Set current project to spark-example-project (in build file:/home/gino/Test/spark-example-project/) [info] Running com.Thinkpalm.spark.WordCountHDFS [error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/SparkContext java.lang.NoClassDefFoundError: org/apache/spark/SparkContext at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12) at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) [trace] Stack trace suppressed: run last compile:run for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM Could anyone give some pointers ... I have attached the project for reference. Thanks and regards Gino Mathews
Re: quick start guide: building a standalone scala program
I am able to run standalone apps. I think you are making one mistake that throws you off from there onwards. You don't need to put your app under SPARK_HOME. I would create it in its own folder somewhere, it follows the rules of any standalone scala program (including the layout). In the giude, $SPARK_HOME is only relevant to find the Readme file which they are parsing/word-counting. But otherwise the compile time dependencies on spark would be resolved via the sbt file (or the pom file if you look at the Java example). So for example I put my app under C:\Source\spark-code and the jar gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if you're running with scala 2.10 as the example shows). But for that part of the guide, it's not any different than building a scala app. On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Diana, I think you are correct - I just installed wget http://mirror.symnds.com/software/Apache/incubator/spark/spark-0.9.0-incubating/spark-0.9.0-incubating-bin-cdh4.tgz and indeed I see the same error that you see It looks like in previous versions sbt-launch used to just come down in the package, but now they try to get it for you -- and that code seems to have some assumptions on where it is being invoked from On Mon, Mar 24, 2014 at 5:47 PM, Diana Carroll dcarr...@cloudera.com wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working as written on the Spark website. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately
Re: Writing RDDs to HDFS
Ognen, can you comment if you were actually able to run two jobs concurrently with just restricting spark.cores.max? I run Shark on the same cluster and was not able to see a standalone job get in (since Shark is a long running job) until I restricted both spark.cores.max _and_ spark.executor.memory. Just curious if I did something wrong. On Mon, Mar 24, 2014 at 7:48 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Just so I can close this thread (in case anyone else runs into this stuff) - I did sleep through the basics of Spark ;). The answer on why my job is in waiting state (hanging) is here: http://spark.incubator.apache.org/docs/latest/spark-standalone.html#resource-scheduling Ognen On 3/24/14, 5:01 PM, Diana Carroll wrote: Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen -- A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable -- Leslie Lamport
Re: Distributed running in Spark Interactive shell
Nan (or anyone who feels they understand the cluster architecture well), can you clarify something for me. From reading this user group and your explanation above it appears that the cluster master is only involved in this during application startup -- to allocate executors(from what you wrote sounds like the driver itself passes the job/tasks to the executors). From there onwards all computation is done on the executors, who communicate results directly to the driver if certain actions (say collect) are performed. Is that right? The only description of the cluster I've seen came from here: https://spark.apache.org/docs/0.9.0/cluster-overview.html but that picture suggests there is no direct communication between driver and executors, which I believe is wrong (unless I am misreading the picture -- I believe Master and Cluster Manager refer to the same thing?). The very short form of my question is, does the master do anything other than executor allocation? On Wed, Mar 26, 2014 at 9:23 AM, Nan Zhu zhunanmcg...@gmail.com wrote: what you only need to do is ensure your spark cluster is running well, (you can check by access the Spark UI to see if all workers are displayed) then, you have to set correct SPARK_MASTER_IP in the machine where you run spark-shell The more details are : when you run bin/spark-shell, it will start the driver program in that machine, interacting with the Master to start the application (in this case, it is spark-shell) the Master tells Workers to start executors for your application, and the executors will try to register with your driver, then your driver can distribute tasks to the executors, i.e. run in a distributed fashion Best, -- Nan Zhu On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote: Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.* -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.*
Re: Calling Spark enthusiasts in NYC
Nicholas, I'm in Boston and would be interested in a Spark group. Not sure if you know this -- there was a meetup that never got off the ground. Anyway, I'd be +1 for attending. Not sure what is involved in organizing. Seems a shame that a city like Boston doesn't have one. On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: As in, I am interested in helping organize a Spark meetup in the Boston area. On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Well, since this thread has played out as it has, lemme throw in a shout-out for Boston. On Mon, Mar 31, 2014 at 1:52 PM, Chris Gore cdg...@cdgore.com wrote: We'd love to see a Spark user group in Los Angeles and connect with others working with it here. Ping me if you're in the LA area and use Spark at your company ( ch...@retentionscience.com ). Chris Retention Science call: 734.272.3099 visit: Site | like: Facebook | follow: Twitter On Mar 31, 2014, at 10:42 AM, Anurag Dodeja anu...@anuragdodeja.com wrote: How about Chicago? On Mon, Mar 31, 2014 at 12:38 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Montreal or Toronto? On Mon, Mar 31, 2014 at 1:36 PM, Martin Goodson mar...@skimlinks.com wrote: How about London? -- Martin Goodson | VP Data Science (0)20 3397 1240 image.png On Mon, Mar 31, 2014 at 6:28 PM, Andy Konwinski andykonwin...@gmail.com wrote: Hi folks, We have seen a lot of community growth outside of the Bay Area and we are looking to help spur even more! For starters, the organizers of the Spark meetups here in the Bay Area want to help anybody that is interested in setting up a meetup in a new city. Some amazing Spark champions have stepped forward in Seattle, Vancouver, Boulder/Denver, and a few other areas already. Right now, we are looking to connect with you Spark enthusiasts in NYC about helping to run an inaugural Spark Meetup in your area. You can reply to me directly if you are interested and I can tell you about all of the resources we have to offer (speakers from the core community, a budget for food, help scheduling, etc.), and let's make this happen! Andy
Re: Sample Project for using Shark API in Spark programs
I might be wrong here but I don't believe it's discouraged. Maybe part of the reason there's not a lot of examples is that sql2rdd returns an RDD (TableRDD that is https://github.com/amplab/shark/blob/master/src/main/scala/shark/SharkContext.scala). I haven't done anything too complicated yet but my impression is that almost any Spark example of manipulating RDDs should applying from that line onwards. Are you asking for samples what to do with the RDD once you get it or how to get a SharkContext from a standalone program? Also, my reading of a recent email on this list is that SharkAPI will be largely superceded by a more general SparkSQL API in 1.0 (http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html). So if you're just starting out and you don't have short term needs that might be a better place to start... On Mon, Apr 7, 2014 at 9:14 AM, Jerry Lam chiling...@gmail.com wrote: Hi Shark, Should I assume that Shark users should not use the shark APIs since there are no documentations for it? If there are documentations, can you point it out? Best Regards, Jerry On Thu, Apr 3, 2014 at 9:24 PM, Jerry Lam chiling...@gmail.com wrote: Hello everyone, I have successfully installed Shark 0.9 and Spark 0.9 in standalone mode in a cluster of 6 nodes for testing purposes. I would like to use Shark API in Spark programs. So far I could only find the following: $./bin/shark-shell scala val youngUsers = sc.sql2rdd(SELECT * FROM users WHERE age 20) scala println(youngUsers.count) ... scala val featureMatrix = youngUsers.map(extractFeatures(_)) scala kmeans(featureMatrix) Is there a more complete sample code to start a program using Shark API in Spark? Thanks! Jerry
Re: Urgently need help interpreting duration
Thank you -- this actually helped a lot. Strangely it appears that the task detail view is not accurate in 0.8 -- that view shows 425ms duration for one of the tasks, but in the driver log I do indeed see Finished TID 125 in 10940ms. On that slow worker I see the following: 14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Started reading broadcast variable 81 14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called with curMem=26011999, maxMem=2662874480 14/04/08 18:06:34 INFO storage.MemoryStore: Block broadcast_81 stored as values to memory (estimated size 491.5 KB, free 2.5 GB) 14/04/08 18:06:34 INFO broadcast.HttpBroadcast: *Reading broadcast variable 81 took 10.051249937 s* 14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Started reading broadcast variable 82 14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called with curMem=26515291, maxMem=2662874480 14/04/08 18:06:34 INFO storage.MemoryStore: Block broadcast_82 stored as values to memory (estimated size 491.5 KB, free 2.5 GB) 14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Reading broadcast variable 82 took 0.027498244 s 14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Started reading broadcast variable 83 14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called with curMem=27018583, maxMem=2662874480 and here is the same variable read on a different worker 14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Started reading broadcast variable 81 14/04/08 18:06:24 INFO storage.MemoryStore: ensureFreeSpace(503292) called with curMem=35008553, maxMem=2662874480 14/04/08 18:06:24 INFO storage.MemoryStore: Block broadcast_81 stored as values to memory (estimated size 491.5 KB, free 2.4 GB) 14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Reading broadcast variable 81 took 0.029252199 s Any thoughts why read of variable 81 is so slow on one machine? The job is a sum by key across several partitions -- it doesn't seem that variable 81 is any larger than the rest (per the log lines above), so it's puzzling that it's taking so very long... On Tue, Apr 8, 2014 at 12:24 PM, Aaron Davidson ilike...@gmail.com wrote: Also, take a look at the driver logs -- if there is overhead before the first task is launched, the driver logs would likely reveal this. On Tue, Apr 8, 2014 at 9:21 AM, Aaron Davidson ilike...@gmail.com wrote: Off the top of my head, the most likely cause would be driver GC issues. You can diagnose this by enabling GC printing at the driver and you can fix this by increasing the amount of memory your driver program has (see http://spark.apache.org/docs/0.9.0/tuning.html#garbage-collection-tuning ). The launch time statistic would also be useful -- if all tasks are launched at around the same time and complete within 300ms, yet the total time is 10s, that strongly suggests that the overhead is coming before the first task is launched. Similarly, it would be useful to know if there was a large gap between launch times, or if it appeared that the launch times were serial with respect to the durations. If for some reason Spark started using only one executor, say, each task would take the same duration but would be executed one after another. On Tue, Apr 8, 2014 at 8:11 AM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Hi Spark users, I'm very much hoping someone can help me out. I have a strict performance requirement on a particular query. One of the stages shows great variance in duration -- from 300ms to 10sec. The stage is mapPartitionsWithIndex at Operator.scala:210 (running Spark 0.8) I have run the job quite a few times -- the details within the stage do not account for the overall duration shown for the stage. What could be taking up time that's not showing within the stage breakdown UI? Im thinking that reading the data in is reflected in the Duration column before, so caching should not be a reason(I'm not caching explicitly)? The details within the stage always show roughly the following (both for the 10second and 600ms query -- very little variation, nothing over 500ms, ShuffleWrite size is pretty comparable): StatusLocality LevelExecutor Launch Time DurationGC TimeShuffle Write 1864 SUCCESSNODE_LOCAL ### 301 ms 8 ms 111.0 B 1863 SUCCESSNODE_LOCAL ### 273 ms102.0 B 1862 SUCCESSNODE_LOCAL ### 245 ms111.0 B 1861 SUCCESSNODE_LOCAL ### 326 ms 4 ms 102.0 B 1860 SUCCESSNODE_LOCAL ### 217 ms 6 ms 102.0 B 1859 SUCCESSNODE_LOCAL ### 277 ms 111.0 B 1858 SUCCESSNODE_LOCAL ### 262 ms 108.0 B 1857 SUCCESSNODE_LOCAL ### 217 ms 14 ms 112.0 B 1856 SUCCESSNODE_LOCAL ### 208 ms 109.0 B 1855 SUCCESSNODE_LOCAL ### 242 ms 74.0 B 1854 SUCCESSNODE_LOCAL ### 218 ms 3 ms 58.0 B 1853 SUCCESSNODE_LOCAL ### 254 ms 12 ms 102.0 B 1852 SUCCESSNODE_LOCAL ### 274 ms 8 ms 77.0 B
Re: spark-shell driver interacting with Workers in YARN mode - firewall blocking communication
I think what you want to do is set spark.driver.port to a fixed port. On Fri, May 2, 2014 at 1:52 PM, Andrew Lee alee...@hotmail.com wrote: Hi All, I encountered this problem when the firewall is enabled between the spark-shell and the Workers. When I launch spark-shell in yarn-client mode, I notice that Workers on the YARN containers are trying to talk to the driver (spark-shell), however, the firewall is not opened and caused timeout. For the Workers, it tries to open listening ports on 54xxx for each Worker? Is the port random in such case? What will be the better way to predict the ports so I can configure the firewall correctly between the driver (spark-shell) and the Workers? Is there a range of ports we can specify in the firewall/iptables? Any ideas?
Shark resilience to unusable slaves
Hi, I am running into a pretty concerning issue with Shark (granted I'm running v. 0.8.1). I have a Spark slave node that has run out of disk space. When I try to start Shark it attempts to deploy the application to a directory on that node, fails and eventually gives up (I see a Master Removed our application message in the shark server log). Is Spark supposed to be able to ignore a slave if something goes wrong for it (I realize that the slave probably appears alive enough)? I restarted the Spark master in hopes that it would detect that the slave is suffering but it doesn't seem to be the case. Any thoughts appreciated -- we'll monitor disk space but I'm a little worried that the cluster is not functional on account of a single slave.
Re: Running Jars on Spark, program just hanging there
Does the spark UI show your program running? (http://spark-masterIP:8118). If the program is listed as running you should be able to see details via the UI. In my experience there are 3 sets of logs -- the log where you're running your program (the driver), the log on the master node, and the log on each executor. The master log often has very useful details when one of your slave executors has an issue. Then you can go and read the logs on that machine. Of course, if you have a small number of workers in your cluster you can just read all the logs. That's just general debugging advice... (I also find it useful to do rdd.partitions.size before anything else to check how many partitions the RDD is actually partitioned to...) On Tue, May 27, 2014 at 2:48 PM, Min Li limin...@gmail.com wrote: Hi all, I've a single machine with 8 cores and 8g mem. I've deployed the standalone spark on the machine and successfully run the examples. Now I'm trying to write some simple java codes. I just read a local file (23M) into string list and use JavaRDDString rdds = sparkContext.paralellize() method to get the corresponding rdd. And I asked to run rdds.count(). But the program just stopped on the count(). The last log info is: 14/05/27 14:13:16 INFO SparkContext: Starting job: count at RDDTest.java:40 14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at RDDTest.java:40) with 2 output partitions (allowLocal=false) 14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at RDDTest.java:40) 14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List() 14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List() 14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which has no missing parents 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140527141316-0003 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added: app-20140527141316-0003/0 on worker-20140526221107-spark-35303 (spark:35303) with 8 cores 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores, 1024.0 MB RAM 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated: app-20140527141316-0003/0 is now RUNNING 14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37) 14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168] with ID 0 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as 12993529 bytes in 127 ms 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as 13006417 bytes in 74 ms 14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager spark:37617 with 589.2 MB RAM I tried to figure out what's going on, but just can't. Could any please give me some suggestions and point out some possible issues? Best Regards, Min
Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)
Hi, I see this has been asked before but has not gotten any satisfactory answer so I'll try again: (here is the original thread I found: http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E ) I have a set of workers dying and coming back again. The master prints the following warning: Got heartbeat from unregistered worker What is the solution to this -- rolling the master is very undesirable to me as I have a Shark context sitting on top of it (it's meant to be highly available). Insights appreciated -- I don't think an executor going down is very unexpected but it does seem odd that it won't be able to rejoin the working set. I'm running Spark 0.9.1 on CDH
Need some Streaming help
Like many people, I'm trying to do hourly counts. The twist is that I don't want to count per hour of streaming, but per hour of the actual occurrence of the event (wall clock, say -mm-dd HH). My thought is to make the streaming window large enough that a full hour of streaming data would fit inside it. Since my window slides in small increments, I want to drop the lowest hour from the stream before persisting the results(since it would have been reduced during the previous batch and would be a partial count in the current). I have gotten this far Every line of the input files is parsed into Event(type, hour), and stream is a DStream[RDD[Event]] val evtCountsByHour = stream.map(evt = (evt, 1)) .reduceByKeyAndWindow(_+_, Seconds(secondsInWindow)) //hourly counts per event .mapPartitions(iter = iter.map(x=(x._1.hour,x))) My understanding is that at this point, the event counts are keyed by hour. 1. How do I detect the smallest key? I have seen some examples of partitionBy + mapPartitionsWithIndex and dropping the lowest index but can't figure out how to do it with a DStream. My gut feeling is that the first RDD in the stream has to contain the oldest data but that doesn't seem to be the case(printed from inside evtCountsByHour.foreachRDD) 2. If someone is further ahead with this type of problem, could you give some insight on how you approached it -- I think Streaming would be the correct approach since I don't really want to worry about data that was already processed and I want to process it continuously. I opted on reduceByKeyAndWindow with a large window as opposed to updateStateByKey as the hour the event occurred in is part of the key and I don't care to keep around that key once the next hour's events are coming in (I'm assuming RDDs outside the window are considered unreferenced). But I'd love to hear other suggestions if my logic is off.
Help with object access from mapper (simple question)
Hi folks, hoping someone can explain to me what's going on: I have the following code, largely based on RecoverableNetworkWordCount example ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ): I am setting fields on an object that gets accessed within the map function. But the workers do not see the set values. Could someone help me understand what is going on? I suspect the serialization of the object happens not when I think it does object Foo{ var field1 = -1 var field2 = -2 } def main(args: Array[String]) { //open a yaml file Foo.field1 = value_from_yaml Foo.field2 = value from_yaml val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext() }) ssc.start() } def createContext(){ // create streaming context println(Foo)-- foo's fields are set lines.map(line ={ println(Foo) -- foo's fields are not set }) }
Re: Help with object access from mapper (simple question)
Thank you so much! I was trying for a singleton and opted against a class but clearly this backfired. Clearly time to revisit Scala lessons. Thanks again On Mon, Jun 23, 2014 at 1:16 PM, Marcelo Vanzin van...@cloudera.com wrote: object in Scala is similar to a class with only static fields / methods in Java. So when you set its fields in the driver, the object does not get serialized and sent to the executors; they have their own copy of the class and its static fields, which haven't been initialized. Use a proper class, instantiate it, and then use it in the executors. e.g. class Foo extends Serializable { ... } val foo = new Foo() foo.field1 = blah lines.map(line = { println(foo) }) // now you should see the field values you set. On Mon, Jun 23, 2014 at 7:44 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, hoping someone can explain to me what's going on: I have the following code, largely based on RecoverableNetworkWordCount example ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ): I am setting fields on an object that gets accessed within the map function. But the workers do not see the set values. Could someone help me understand what is going on? I suspect the serialization of the object happens not when I think it does object Foo{ var field1 = -1 var field2 = -2 } def main(args: Array[String]) { //open a yaml file Foo.field1 = value_from_yaml Foo.field2 = value from_yaml val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext() }) ssc.start() } def createContext(){ // create streaming context println(Foo)-- foo's fields are set lines.map(line ={ println(Foo) -- foo's fields are not set }) } -- Marcelo
Help understanding spark.task.maxFailures
Hi community, this one should be an easy one: I have left spark.task.maxFailures to it's default (which should be 4). I see a job that shows the following statistics for Tasks: Succeeded/Total 7109/819 (1 failed) So there were 819 tasks to start with. I have 2 executors in that cluster. From Spark docs it says spark.task.maxFailures is the number of times to try a task before a job is given up. So I was imagining that 819*4 (i.e. 3276) would be the max number to ever see in the succeeded (accounting for retries on every possibly task). even that 3276*2 (6552, if it's per task per executor) does not account for 7109 successfull tasks. Could anyone help explain why I'm seeing such high number of succeeded tasks?
Re: Changing log level of spark
Are you looking at the driver log? (e.g. Shark?). I see a ton of information in the INFO category on what query is being started, what stage is starting and which executor stuff is sent to. So I'm not sure if you're saying you see all that and you need more, or that you're not seeing this type of information. I cannot speak to the ec2 setup, just pointing out that under 0.9.1 I see quite a bit of scheduling information in the driver log. On Tue, Jul 1, 2014 at 9:20 AM, Philip Limbeck philiplimb...@gmail.com wrote: We changed the loglevel to DEBUG by replacing every INFO with DEBUG in /root/ephemeral-hdfs/conf/log4j.properties and propagating it to the cluster. There is some DEBUG output visible in both master and worker but nothing really interesting regarding stages or scheduling. Since we expected a little more than that, there could be 2 possibilites: a) There is still some other unknown way to set the loglevel to debug b) There is not that much log output to be expected in this direction, I looked for logDebug (The log wrapper in spark) in github with 84 results, which means that I doubt that there is not much else to expect. We actually just want to have a little more insight into the system behavior especially when using Shark since we ran into some serious concurrency issues with blocking queries. So much for the background why this is important to us. On Thu, Jun 26, 2014 at 3:30 AM, Aaron Davidson ilike...@gmail.com wrote: If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote: I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging, changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level.
Re: Spark Streaming question batch size
Are you saying that both streams come in at the same rate and you have the same batch interval but the batch size ends up different? i.e. two datapoints both arriving at X seconds after streaming starts end up in two different batches? How do you define real time values for both streams? I am trying to do something similar to you, I think -- but I'm not clear on what your notion of time is. My reading of your example above is that the streams just pump data in at different rates -- first one got 7462 points in the first batch interval, whereas stream2 saw 10493 On Tue, Jul 1, 2014 at 5:34 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, The window size in a spark streaming is time based which means we have different number of elements in each window. For example if you have two streams (might be more) which are related to each other and you want to compare them in a specific time interval. I am not clear how it will work. Although they start running simultaneously, they might have different number of elements in each time interval. The following is output for two streams which have same number of elements and ran simultaneously. The left most value is the number of elements in each window. If we add the number of elements them, they are same for both streams but we can't compare both streams as they are different in window size and number of windows. Can we somehow make windows based on real time values for both streams? or Can we make windows based on number of elements? (n, (mean, varience, SD)) Stream 1 (7462,(1.0535658165371238,4242.001306434091,65.13064798107025)) (44826,(0.2546925855084064,5042.890184382894,71.0133099100647)) (245466,(0.2857731601728941,5014.411691661449,70.81251084138628)) (154852,(0.21907814309792514,3483.800160602281,59.023725404300606)) (156345,(0.3075668844414613,7449.528181550462,86.31064929399189)) (156603,(0.27785151491351234,5917.809892281489,76.9273026452994)) (156047,(0.18130350363672296,4019.0232843737017,63.39576708561623)) Stream 2 (10493,(0.5554953964547791,1254.883548218503,35.42433553672536)) (180649,(0.21684831234050583,1095.9634245399352,33.1053383087975)) (179994,(0.22048869512317407,1443.0566458182718,37.98758541705792)) (179455,(0.20473330254938552,1623.9538730448216,40.29831104456888)) (269817,(0.16987953223480945,3270.663944782799,57.18971887308766)) (101193,(0.21469292497504766,1263.0879032808723,35.53994799209577)) Regards, Laeeq
Re: Lost TID: Loss was due to fetch failure from BlockManagerId
A lot of things can get funny when you run distributed as opposed to local -- e.g. some jar not making it over. Do you see anything of interest in the log on the executor machines -- I'm guessing 192.168.222.152/192.168.222.164. From here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala seems like the warning message is logged after the task fails -- but I wonder if you might see something more useful as to why it failed to begin with. As an example we've had cases in Hdfs where a small example would work, but on a larger example we'd hit a bad file. But the executor log is usually pretty explicit as to what happened... On Tue, Jul 1, 2014 at 8:57 PM, Mohammed Guller moham...@glassbeam.com wrote: I am running Spark 1.0 on a 4-node standalone spark cluster (1 master + 3 worker). Our app is fetching data from Cassandra and doing a basic filter, map, and countByKey on that data. I have run into a strange problem. Even if the number of rows in Cassandra is just 1M, the Spark job goes seems to go into an infinite loop and runs for hours. With a small amount of data (less than 100 rows), the job does finish, but takes almost 30-40 seconds and we frequently see the messages shown below. If we run the same application on a single node Spark (--master local[4]), then we don’t see these warnings and the task finishes in less than 6-7 seconds. Any idea what could be the cause for these problems when we run our application on a standalone 4-node spark cluster? 14/06/30 19:30:16 WARN TaskSetManager: Lost TID 25036 (task 6.0:90) 14/06/30 19:30:16 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:18 WARN TaskSetManager: Lost TID 25310 (task 6.1:0) 14/06/30 19:30:18 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:19 WARN TaskSetManager: Lost TID 25582 (task 6.2:0) 14/06/30 19:30:19 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:21 WARN TaskSetManager: Lost TID 25882 (task 6.3:34) 14/06/30 19:30:21 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(0, 192.168.222.142, 39342, 0) 14/06/30 19:30:22 WARN TaskSetManager: Lost TID 26152 (task 6.4:0) 14/06/30 19:30:22 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(0, 192.168.222.142, 39342, 0) 14/06/30 19:30:23 WARN TaskSetManager: Lost TID 26427 (task 6.5:4) 14/06/30 19:30:23 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:25 WARN TaskSetManager: Lost TID 26690 (task 6.6:0) 14/06/30 19:30:25 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:26 WARN TaskSetManager: Lost TID 26959 (task 6.7:0) 14/06/30 19:30:26 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:28 WARN TaskSetManager: Lost TID 27449 (task 6.8:218) 14/06/30 19:30:28 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:30 WARN TaskSetManager: Lost TID 27718 (task 6.9:0) 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:31 WARN TaskSetManager: Lost TID 27991 (task 6.10:1) 14/06/30 19:30:31 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:33 WARN TaskSetManager: Lost TID 28265 (task 6.11:0) 14/06/30 19:30:33 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:34 WARN TaskSetManager: Lost TID 28550 (task 6.12:0) 14/06/30 19:30:34 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:36 WARN TaskSetManager: Lost TID 28822 (task 6.13:0) 14/06/30 19:30:36 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:37 WARN TaskSetManager: Lost TID 29093 (task 6.14:0) 14/06/30 19:30:37 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:39 WARN TaskSetManager: Lost TID 29366 (task 6.15:0) 14/06/30 19:30:39 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:40 WARN TaskSetManager: Lost TID 29648 (task 6.16:9) 14/06/30 19:30:40 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:42 WARN TaskSetManager: Lost TID 29924 (task
Re: Help alleviating OOM errors
Can you elaborate why You need to configure the spark.shuffle.spill true again in the config -- the default for spark.shuffle.spill is set to true according to the doc(https://spark.apache.org/docs/0.9.1/configuration.html)? On OOM the tasks were process_local, which I understand is as good as it gets but still going on 32+ hours. On Wed, Jul 2, 2014 at 2:40 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Mon, Jun 30, 2014 at 8:09 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, our cluster seems to have a really hard time with OOM errors on the executor. Periodically we'd see a task that gets sent to a few executors, one would OOM, and then the job just stays active for hours (sometimes 30+ whereas normally it completes sub-minute). So I have a few questions: 1. Why am I seeing OOMs to begin with? I'm running with defaults for spark.storage.memoryFraction spark.shuffle.memoryFraction so my understanding is that if Spark exceeds 60% of available memory, data will be spilled to disk? Am I misunderstanding this? In the attached screenshot, I see a single stage with 2 tasks on the same executor -- no disk spills but OOM. You need to configure the spark.shuffle.spill true again in the config, What is causing you to OOM, it could be that you are trying to just simply sortbykey keys are bigger memory of executor causing the OOM, can you put the stack. 2. How can I reduce the likelyhood of seeing OOMs -- I am a bit concerned that I don't see a spill at all so not sure if decreasing spark.storage.memoryFraction is what needs to be done 3. Why does an OOM seem to break the executor so hopelessly? I am seeing times upwards of 30hrs once an OOM occurs. Why is that -- the task *should* take under a minute, so even if the whole RDD was recomputed from scratch, 30hrs is very mysterious to me. Hadoop can process this in about 10-15 minutes, so I imagine even if the whole job went to disk it should still not take more than an hour When OOM occurs it could cause the RDD to spill to disk, the repeat task may be forced to read data from disk cause the overall slowdown, not to mention the RDD may be send to different executor to be processed, are you seeing the slow tasks as process_local or node_local atleast? Any insight into this would be much appreciated. Running Spark 0.9.1
Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector
The scripts that Xiangrui mentions set up the classpath...Can you run ./run-example for the provided example sucessfully? What you can try is set SPARK_PRINT_LAUNCH_COMMAND=1 and then call run-example -- that will show you the exact java command used to run the example at the start of execution. Assuming you can run examples succesfully, you should be able to just copy that and add your jar to the front of the classpath. If that works you can start removing extra jars (run-examples put all the example jars in the cp, which you won't need) As you said the error you see is indicative of the class not being available/seen at runtime but it's hard to tell why. On Wed, Jul 2, 2014 at 2:13 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: I want to make some minor modifications in the SparkMeans.scala so running the basic example won't do. I have also packed my code under a jar file with sbt. It completes successfully but when I try to run it : java -jar myjar.jar I get the same error: Exception in thread main java.lang.NoClassDefFoundError: breeze/linalg/Vector at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2531) at java.lang.Class.getMethod0(Class.java:2774) at java.lang.Class.getMethod(Class.java:1663) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) If scalac -d classes/ SparkKMeans.scala can't see my classpath, why does it succeeds in compiling and does not give the same error ? The error itself NoClassDefFoundError means that the files are available at compile time, but for some reason I cannot figure out they are not available at run time. Does anyone know why ? Thank you On Tuesday, July 1, 2014 7:03 PM, Xiangrui Meng men...@gmail.com wrote: You can use either bin/run-example or bin/spark-summit to run example code. scalac -d classes/ SparkKMeans.scala doesn't recognize Spark classpath. There are examples in the official doc: http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here -Xiangrui On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: Hello, I have installed spark-1.0.0 with scala2.10.3. I have built spark with sbt/sbt assembly and added /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar to my CLASSPATH variable. Then I went here ../spark-1.0.0/examples/src/main/scala/org/apache/spark/examples created a new directory classes and compiled SparkKMeans.scala with scalac -d classes/ SparkKMeans.scala Then I navigated to classes (I commented this line in the scala file : package org.apache.spark.examples ) and tried to run it with java -cp . SparkKMeans and I get the following error: Exception in thread main java.lang.NoClassDefFoundError: breeze/linalg/Vector at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2531) at java.lang.Class.getMethod0(Class.java:2774) at java.lang.Class.getMethod(Class.java:1663) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) Caused by: java.lang.ClassNotFoundException: breeze.linalg.Vector at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more The jar under /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar contains the breeze/linalg/Vector* path, I even tried to unpack it and put it in CLASSPATH to it does not seem to pick it up I am currently running java 1.8 java version 1.8.0_05 Java(TM) SE Runtime Environment (build 1.8.0_05-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode) What I am doing wrong ?
Re: Cannot submit to a Spark Application to a remote cluster Spark 1.0
class java.io.IOException: Cannot run program /Users/aris.vlasakakis/Documents/spark-1.0.0/bin/compute-classpath.sh (in directory .): error=2, No such file or directory By any chance, are your SPARK_HOME directories different on the machine where you're submitting from and the cluster? I'm on an older drop so not sure about the finer points of spark-submit but do remember a very similar issue when trying to run a Spark driver on a windows machine against a Spark Master on Ubuntu cluster (the SPARK_HOME directories were obviously different) On Wed, Jul 9, 2014 at 7:18 PM, Aris Vlasakakis a...@vlasakakis.com wrote: Hello everybody, I am trying to figure out how to submit a Spark application from one separate physical machine to a Spark stand alone cluster. I have an application that I wrote in Python that works if I am on the 1-Node Spark server itself, and from that spark installation I run bin/spark-submit with 1) MASTER=local[*] or if 2) MASTER=spark://localhost:7077. However, I want to be on a separate machine that submits a job to Spark. Am I doing something wrong here? I think something is wrong because I am working from two different spark installations -- as in, on the big server I have one spark installation and I am running sbin/start-all.sh to run the standalone server (and that works), and then on a separate laptop I have a different installation of spark-1.0.0, but I am using the laptop's bin/spark-submit script to submit to the remote Spark server (using MASTER=spark://remote-spark-master:7077 This submit-to-remote cluster does not work, even for the Scala examples like SparkPi. Concrete Example: I want to do submit the example SparkPi to the cluster, from my laptop. Server is 10.20.10.152, running master and slave, I can look at the Master web UI at http://10.20.10.152:8080. Great. From laptop (10.20.10.154), I try the following, using bin/run-example from a locally built version of spark 1.0.0 (so that I have the script spark-submit!): bin/spark-submit --verbose --class org.apache.spark.examples.SparkPi --master spark://10.20.10.152:7077 examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar This fails, with the errors at the bottom of this email. Am I doing something wrong? How can I submit to a remote cluster? I get the same problem with bin/spark-submit. bin/spark-submit --verbose --class org.apache.spark.examples.SparkPi --master spark://10.20.10.152:7077 examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar Using properties file: null Using properties file: null Parsed arguments: master spark://10.20.10.152:7077 deployMode null executorMemory null executorCores null totalExecutorCores null propertiesFile null driverMemorynull driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar nameorg.apache.spark.examples.SparkPi childArgs [] jarsnull verbose true Default properties from null: Using properties file: null Main class: org.apache.spark.examples.SparkPi Arguments: System properties: SPARK_SUBMIT - true spark.app.name - org.apache.spark.examples.SparkPi spark.jars - file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar spark.master - spark://10.20.10.152:7077 Classpath elements: file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar 14/07/09 16:16:08 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/09 16:16:08 INFO SecurityManager: Changing view acls to: aris.vlasakakis 14/07/09 16:16:08 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(aris.vlasakakis) 14/07/09 16:16:08 INFO Slf4jLogger: Slf4jLogger started 14/07/09 16:16:08 INFO Remoting: Starting remoting 14/07/09 16:16:08 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.20.10.154:50478] 14/07/09 16:16:08 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.20.10.154:50478] 14/07/09 16:16:08 INFO SparkEnv: Registering MapOutputTracker 14/07/09 16:16:08 INFO SparkEnv: Registering BlockManagerMaster 14/07/09 16:16:08 INFO DiskBlockManager: Created local directory at
Re: Map Function does not seem to be executing over RDD
Does this line println(Retuning +string) from the hash function print what you expect? If you're not seeing that output in the executor log I'd also put some debug statements in case other, since your match in the interesting case is conditioned on if( fieldsList.contains(index)) -- maybe that doesn't catch what you think it should...if that's the case you can dump out the contents of fieldsList within the other case (i.e. inside the map) and see what's there... On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman razaurreh...@gmail.com wrote: Hello every one I am having some problem with a simple Scala/ Spark Code in which I am trying to replaces certain fields in a csv with their hashes class DSV (var line:String=,fieldsList:Seq[Int], var delimiter:String=,) extends Serializable { def hash(s:String):String={ var md = MessageDigest.getInstance(sha) md.update(s.getBytes(UTF-8)) var digest = md.digest() val string:Option[String] = Option(digest).map(Hex.valueOf) println(Retuning +string) string.getOrElse() } def anonymizeFields(l:String):String ={ l.split(delimiter,-1).zipWithIndex .map { case (str, index) if( fieldsList.contains(index)) =hash(str) case other = other._1 }.mkString(delimiter) } } I am calling the anonymize function like this but the anondata seems to be the same as the original dsvData var dsvData = sc.textFile(inputPath+inputFileName).map( line=(new DSV(line,List(1,2), \\|)) ) println(Lines Processed=+dsvData.count()) var anonData = dsvData.map(l=l.anonymizeFields(l.line)) println(DSVs Processed=+anonData.count()) anonData.saveAsTextFile(outputPath+outputFileName) I have tried the execution through shell as well but the problem persists. The job does finish but the worker log shows the following error message 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@host:60593] - [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [ Regards MRK
Re: incorrect labels being read by MLUtils.loadLabeledData()
I do not believe the order of points in a distributed RDD is in any way guaranteed. For a simple test, you can always add a last column which is an id (make it double and throw it in the feature vector). Printing the rdd back will not give you the points in file order. If you don't want to go that far you can always examine the full feature vector carefully -- points 12 and 14 should differ from your input csv in the feature vector as well as the label. On Thu, Jul 10, 2014 at 6:28 PM, SK skrishna...@gmail.com wrote: Hi, I have a csv data file, which I have organized in the following format to be read as a LabeledPoint(following the example in mllib/data/sample_tree_data.csv): 1,5.1,3.5,1.4,0.2 1,4.9,3,1.4,0.2 1,4.7,3.2,1.3,0.2 1,4.6,3.1,1.5,0.2 The first column is the binary label (1 or 0) and the remaining columns are features. I am using the Logistic Regression Classifier in MLLib to create a model based on the training data and predict the (binary) class of the test data. I use MLUtils.loadLabeledData to read the data file. My prediction accuracy is quite low (compared to the results I got for the same data from R), So I tried to debug, by first verifying that the LabeledData is being read correctly. I find that some of the labels are not read correctly. For example, the first 40 points of the training data have a class of 1, whereas the training data read by loadLabeledData has label 0 for point 12 and point 14. I would like to know if this is because of the distributed algorithm that MLLib uses or if there is something wrong with the format I have above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/incorrect-labels-being-read-by-MLUtils-loadLabeledData-tp9356.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Help using streaming from Spark Shell
Hi, I'm starting spark-shell like this: SPARK_MEM=1g SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600 /spark/bin/spark-shell -c 3 but when I try to create a streaming context val scc = new StreamingContext(sc, Seconds(10)) I get: org.apache.spark.SparkException: Spark Streaming cannot be used without setting spark.cleaner.ttl; set this property before creating a SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the shell) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:121) I also tried export SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600 before calling spark-shell but with no luck... What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade
[Streaming] updateStateByKey trouble
Hi folks, hoping someone who works with Streaming can help me out. I have the following snippet: val stateDstream = data.map(x = (x, 1)) .updateStateByKey[State](updateFunc) stateDstream.saveAsTextFiles(checkpointDirectory, partitions_test) where data is a RDD of case class StateKey(host:String,hour:String,customer:String) when I dump out the stream, I see duplicate values in the same partition (I've bolded the keys that are identical): (StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z)) (*StateKey*(www.abcd.com ,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z)) (StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z)) (*StateKey*(www.abcd.com ,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z)) I was under the impression that on each batch, the stream will contain a single RDD with Key-Value pairs, reflecting the latest state of each key. Am I misunderstanding this? Or is the key equality somehow failing? Any tips on this appreciated... PS. For completeness State is case class State(val count:Integer,val update_date:DateTime)
Spark working directories
Hi all, trying to change defaults of where stuff gets written. I've set -Dspark.local.dir=/spark/tmp and I can see that the setting is used when the executor is started. I do indeed see directories like spark-local-20140815004454-bb3f in this desired location but I also see undesired stuff under /tmp usr@executor:~# ls /tmp/spark-93f4d44c-ff4d-477d-8930-5884b10b065f/ files jars usr@driver: ls /tmp/spark-7e456342-a58c-4439-ab69-ff8e6d6b56a5/ files jars Is there a way to move these directories off of /tmp? I am running 0.9.1 (SPARK_WORKER_DIR is also exported on all nodes though all that I see there are executor logs) Thanks
Re: a noob question for how to implement setup and cleanup in Spark map
Sean, would this work -- rdd.mapPartitions { partition = Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect On Mon, Aug 18, 2014 at 3:29 AM, Henry Hung ythu...@winbond.com wrote: I slightly modify the code to use while(partitions.hasNext) { } instead of partitions.map(func) I suppose this can eliminate the uncertainty from lazy execution. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, August 18, 2014 3:10 PM To: MA33 YTHung1 Cc: user@spark.apache.org Subject: Re: a noob question for how to implement setup and cleanup in Spark map I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote: Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I’m new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark… In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
[Streaming] Cannot get executors to stay alive
Hi, I tried a similar question before and didn't get any answers,so I'll try again: I am using updateStateByKey, pretty much exactly as shown in the examples shipping with Spark: def createContext(master:String,dropDir:String, checkpointDirectory:String) = { val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val sparkConf = new SparkConf() .setMaster(master) .setAppName(StatefulNetworkWordCountNoMem) .setJars(StreamingContext.jarOfClass(this.getClass)); val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDirectory) val lines = ssc.textFileStream(dropDir) val wordDstream = lines.map(line ={ val x = line.split(\t) ((x(0), x(1),x(2)), 1) }) wordDstream.print() val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) println(Printing stream) stateDstream.print() ssc } I am running this over a _static_ drop directory (i.e. files are there at the beginning of time, nothing new is coming). The only difference is that I have a pretty wide key space -- about 440K, each key is of length under 20 chars. The code exactly as shown above runs for a while (about an hour) and then the executors start dying with OOM exceptions. I tried executor memory from 512M to 2g (4 executors)-- the only difference is how long it takes for the OOM. The only task that keeps executing is take at DStream(line 586)...which makes sense. What doesn't make sense is why the memory isn't getting reclaimed What am I doing wrong? I'd like to use streaming but it seems that I can't get a process with 0 incoming traffic to stay up. Any advice much appreciated -- I'm sure someone on this list has managed to run a streaming program for longer than an hour!
Re: Spark Streaming checkpoint recovery causes IO re-execution
Can you clarify the scenario: val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDirectory) val stream = KafkaUtils.createStream(...) val wordCounts = lines.flatMap(_.split( )).map(x = (x, 1L)) val wordDstream= wordCounts.updateStateByKey[Int](updateFunc) wordDstream.foreachRDD( rdd=rdd.foreachPartition( //sync state to external store//) ) My impression is that during recovery from checkpoint, your wordDstream would be in the state that it was before the crash +1 batch interval forward when you get to the foreachRDD part -- even if re-creating the pre-crash RDD is really slow. So if your driver goes down at 10:20 and you restart at 10:30, I thought at the time of the DB write wordDstream would have exactly the state of 10:20 + 10seconds worth of aggregated stream data? I don't really understand what you mean by Upon metadata checkpoint recovery (before the data checkpoint occurs) but it sounds like you're observing the same DB write happening twice? I don't have any advice for you but I am interested in understanding better what happens in the recovery scenario so just trying to clarify what you observe. On Thu, Aug 28, 2014 at 6:42 AM, GADV giulio_devec...@yahoo.com wrote: Not sure if this make sense, but maybe would be nice to have a kind of flag available within the code that tells me if I'm running in a normal situation or during a recovery. To better explain this, let's consider the following scenario: I am processing data, let's say from a Kafka streaming, and I am updating a database based on the computations. During the recovery I don't want to update again the database (for many reasons, let's just assume that) but I want my system to be in the same status as before, thus I would like to know if my code is running for the first time or during a recovery so I can avoid to update the database again. More generally I want to know this in case I'm interacting with external entities. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
I understand that the DB writes are happening from the workers unless you collect. My confusion is that you believe workers recompute on recovery(nodes computations which get redone upon recovery). My understanding is that checkpointing dumps the RDD to disk and the cuts the RDD lineage. So I thought on driver restart you'll get a set of new executor processes but they would read the last known state of the RDD from HDFS checkpoint. Am I off here? So the only situation I can imagine where you end up recomputing is if your checkpointing at a larger interval than your batch size (i.e. the RDD on disk does not reflect it's last precrash state)? On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Yana, The fact is that the DB writing is happening on the node level and not on Spark level. One of the benefits of distributed computing nature of Spark is enabling IO distribution as well. For example, is much faster to have the nodes to write to Cassandra instead of having them all collected at the driver level and sending the writes from there. The problem is that nodes computations which get redone upon recovery. If these lambda functions send events to other systems these events would get resent upon re-computation causing overall system instability. Hope this helps you understand the problematic. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Java Configuration.
JavaSparkContext java_SC = new JavaSparkContext(conf); is the spark context. An application has a single spark context -- you won't be able to keep calling this -- you'll see an error if you try to create a second such object from the same application. Additionally, depending on your configuration, if you create a few different apps that each create a spark context, you'll see them all connected to the master in the UI. But they'll have to share executors on the worker machines you have available. You'll often see messages like No resources available if you are trying to run more than 1 app concurrently and the first app you start is resource greedy Hope this helps. On Tue, Sep 2, 2014 at 10:02 AM, pcsenthil pcsent...@gmail.com wrote: Team, I am new to Apache Spark and I didn't have much knowledge on hadoop or big data. I need clarifications on the below, How does Spark Configuration works, from a tutorial i got the below /SparkConf conf = new SparkConf().setAppName(Simple application) .setMaster(local[4]); JavaSparkContext java_SC = new JavaSparkContext(conf);/ from this, i understood that we are providing the config through java program to Spark. Let us assume i have written this in a separate java method. My question are what happen if i am keep on calling this? If this one will will keep on creating new objects for spark on each call, then how we are going to handle the JVM memory? Since under each object i am trying to run 4 concurrent threads? Is there any option to find existing one in JVM, so instead of creating new Spark object i can go with it? Please help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Configuration-tp13269.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[MLib] How do you normalize features?
It seems like the next release will add a nice org.apache.spark.mllib.feature package but what is the recommended way to normalize features in the current release (1.0.2) -- I'm hoping for a general pointer here. At the moment I have a RDD[LabeledPoint] and I can get a MultivariateStatisticalSummary for mean/variance. Is that about the right way to proceed? I'm also not seeing an easy way to subtract vectors -- do I need to do this element-wise? thanks
Re: Object serialisation inside closures
In the third case the object does not get shipped around. Each executor will create it's own instance. I got bitten by this here: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-object-access-from-mapper-simple-question-tt8125.html On Thu, Sep 4, 2014 at 9:29 AM, Andrianasolo Fanilo fanilo.andrianas...@worldline.com wrote: Hello Spark fellows J I’m a new user of Spark and Scala and have been using both for 6 months without too many problems. Here I’m looking for best practices for using non-serializable classes inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2. Suppose I am using OpenCSV parser to parse an input file. So inside my main : val sc = new SparkContext(local[2], App) val heyRDD = sc.textFile(…) val csvparser = new CSVParser(';') val heyMap = heyRDD.map { line = val temp = csvparser.parseLine(line) (temp(1), temp(4)) } This gives me a java.io.NotSerializableException: au.com.bytecode.opencsv.CSVParser, which seems reasonable. From here I could see 3 solutions : 1/ Extending CSVParser with Serialisable properties, which adds a lot of boilerplate code if you ask me 2/ Using Kryo Serialization (still need to define a serializer) 3/ Creating an object with an instance of the class I want to use, typically : object CSVParserPlus { val csvParser = new CSVParser(';') def parse(line: String) = { csvParser.parseLine(line) } } val heyMap = heyRDD.map { line = val temp = CSVParserPlus.parse(line) (temp(1), temp(4)) } Third solution works and I don’t get how, so I was wondering how worked the closure system inside Spark to be able to serialize an object with a non-serializable instance. How does that work ? Does it hinder performance ? Is it a good solution ? How do you manage this problem ? Any input would be greatly appreciated Best regards, Fanilo -- Ce message et les pièces jointes sont confidentiels et réservés à l'usage exclusif de ses destinataires. Il peut également être protégé par le secret professionnel. Si vous recevez ce message par erreur, merci d'en avertir immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra être recherchée quant au contenu de ce message. Bien que les meilleurs efforts soient faits pour maintenir cette transmission exempte de tout virus, l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne saurait être recherchée pour tout dommage résultant d'un virus transmis. This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, the Worldline liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted.
Re: Multiple spark shell sessions
These are just warnings from the web server. Normally your application will have a UI page on port 4040. In your case, a little after the warning it should bind just fine to another port (mine picked 4041). Im running on 0.9.1. Do you actually see the application failing? The main thing when running more than one apps against the master is to make sure you have enough resources for each (i.e. neither is grabbing too many resources or cores). You can see if your app is successfully connected by looking at the master UI page. On Thu, Sep 4, 2014 at 5:58 AM, Dhimant dhimant84.jays...@gmail.com wrote: Hi, I am receiving following error while connecting the spark server via shell if one shell is already open. How can I open multiple sessions ? Does anyone know abt Workflow Engine/Job Server like apache oozie for spark ? / Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_60) Type in expressions to have them evaluated. Type :help for more information. 14/09/04 15:07:46 INFO spark.SecurityManager: Changing view acls to: root 14/09/04 15:07:46 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 15:07:46 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/04 15:07:47 INFO Remoting: Starting remoting 14/09/04 15:07:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@sparkmaster.guavus.com:42236] 14/09/04 15:07:47 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@sparkmaster.guavus.com:42236] 14/09/04 15:07:47 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/04 15:07:47 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/04 15:07:47 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140904150747-4dcd 14/09/04 15:07:47 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/04 15:07:47 INFO network.ConnectionManager: Bound socket to port 54453 with id = ConnectionManagerId(sparkmaster.guavus.com,54453) 14/09/04 15:07:47 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/04 15:07:47 INFO storage.BlockManagerInfo: Registering block manager sparkmaster.guavus.com:54453 with 294.9 MB RAM 14/09/04 15:07:47 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/04 15:07:47 INFO spark.HttpServer: Starting HTTP Server 14/09/04 15:07:47 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 15:07:47 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48977 14/09/04 15:07:47 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.1.21:48977 14/09/04 15:07:47 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-0e45759a-2c58-439a-8e96-95b0bc1d6136 14/09/04 15:07:47 INFO spark.HttpServer: Starting HTTP Server 14/09/04 15:07:47 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 15:07:47 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:39962 14/09/04 15:07:48 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 15:07:48 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:191) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:99) at org.apache.spark.SparkContext.init(SparkContext.scala:223) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:957) at
Re: error: type mismatch while Union
Which version are you using -- I can reproduce your issue w/ 0.9.2 but not with 1.0.1...so my guess is that it's a bug and the fix hasn't been backported... No idea on a workaround though.. On Fri, Sep 5, 2014 at 7:58 AM, Dhimant dhimant84.jays...@gmail.com wrote: Hi, I am getting type mismatch error while union operation. Can someone suggest solution ? / case class MyNumber(no: Int, secondVal: String) extends Serializable with Ordered[MyNumber] { override def toString(): String = this.no.toString + + this.secondVal override def compare(that: MyNumber): Int = this.no compare that.no override def compareTo(that: MyNumber): Int = this.no compare that.no def Equals(that: MyNumber): Boolean = { (this.no == that.no) (that match { case MyNumber(n1, n2) = n1 == no n2 == secondVal case _ = false }) } } val numbers = sc.parallelize(1 to 20, 10) val firstRdd = numbers.map(new MyNumber(_, A)) val secondRDD = numbers.map(new MyNumber(_, B)) val numberRdd = firstRdd .union(secondRDD ) console:24: error: type mismatch; found : org.apache.spark.rdd.RDD[MyNumber] required: org.apache.spark.rdd.RDD[MyNumber] val numberRdd = onenumberRdd.union(anotherRDD)/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot run SimpleApp as regular Java app
spark-submit is a script which calls spark-class script. Can you output the command that spark-class runs (say, by putting set -x before the very last line?). You should see the java command that is being run. The scripts do some parameter setting so it's possible you're missing something. It seems to me you think your worker memory is 2G but the executor is clearly launched with -Xms512M -Xmx512M...so that's all you'd get. On Mon, Sep 8, 2014 at 10:16 AM, ericacm eric...@gmail.com wrote: Dear all: I am a brand new Spark user trying out the SimpleApp from the Quick Start page. Here is the code: object SimpleApp { def main(args: Array[String]) { val logFile = /dev/spark-1.0.2-bin-hadoop2/README.md // Should be some file on your system val conf = new SparkConf() .setAppName(Simple Application) .set(spark.executor.memory, 512m) .setMaster(spark://myhost.local:7077) .setJars(Seq(/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar)) val sc = new SparkContext(conf) try { val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } finally { sc.stop() } } } I am using Spark 1.0.2 and Scala 2.10.4. In spark-env.sh I have SPARK_WORKER_MEMORY=2g. I am trying to run this as a standalone Java app in my IDE. Note that this code *does* work when I either - Change the master to local (works running from IDE) - Run it using spark-submit The application/driver log is: 14/09/08 10:03:55 INFO spark.SecurityManager: Changing view acls to: eric 14/09/08 10:03:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(eric) 14/09/08 10:03:56 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/08 10:03:56 INFO Remoting: Starting remoting 14/09/08 10:03:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.0.1.5:61645] 14/09/08 10:03:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.0.1.5:61645] 14/09/08 10:03:56 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/08 10:03:56 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/08 10:03:56 INFO storage.DiskBlockManager: Created local directory at /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-local-20140908100356-2496 14/09/08 10:03:56 INFO storage.MemoryStore: MemoryStore started with capacity 279.5 MB. 14/09/08 10:03:56 INFO network.ConnectionManager: Bound socket to port 61646 with id = ConnectionManagerId(10.0.1.5,61646) 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/08 10:03:56 INFO storage.BlockManagerInfo: Registering block manager 10.0.1.5:61646 with 279.5 MB RAM 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/08 10:03:56 INFO spark.HttpServer: Starting HTTP Server 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61647 14/09/08 10:03:57 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.0.1.5:61647 14/09/08 10:03:57 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-d5637279-5caa-4c14-a00f-650f1dd915bc 14/09/08 10:03:57 INFO spark.HttpServer: Starting HTTP Server 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61648 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/08 10:03:57 INFO ui.SparkUI: Started SparkUI at http://10.0.1.5:4040 2014-09-08 10:03:57.567 java[58736:1703] Unable to load realm info from SCDynamicStore 14/09/08 10:03:57 INFO spark.SparkContext: Added JAR /spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar at http://10.0.1.5:61648/jars/spark-experiments-1.0-SNAPSHOT.jar with timestamp 1410185037723 14/09/08 10:03:57 INFO client.AppClient$ClientActor: Connecting to master spark://myhost.local:7077... 14/09/08 10:03:57 INFO storage.MemoryStore: ensureFreeSpace(32960) called with curMem=0, maxMem=293063884 14/09/08 10:03:57 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 279.5 MB) 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140908100358-0002 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor added: app-20140908100358-0002/0 on worker-20140908100129-10.0.1.5-61526 (10.0.1.5:61526) with 8 cores 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID
Re: Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)
If that library has native dependencies you'd need to make sure that the native code is on all boxes and in the path with export SPARK_LIBRARY_PATH=... On Tue, Sep 9, 2014 at 10:17 AM, ayandas84 ayanda...@gmail.com wrote: We have a small apache spark cluster of 6 computers. We are trying to solve a distributed problem which requires solving a optimization problem at each machine during a spark map operation. We decided to use mosek as the solver and I collected an academic license to this end. We observed that mosek works fine in a single system. However, when we prepare a jar file, include the mosek.jar into the library and try to run the jar in the cluster as a spark job it gives errors. java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path Does this problem has any thing to do with the license? We have set the necessary path variables i n the profile of the user in the master machine but we are not sure about what changes needs to be made to the other machines in the cluster. We shall be greatly obliged if you please suggest the necessary solution and help us out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Need help with ThriftServer/Spark1.1.0
Hi ladies and gents, trying to get Thrift server up and running in an effort to replace Shark. My first attempt to run sbin/start-thriftserver resulted in: 14/09/15 17:09:05 ERROR TThreadPoolServer: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) After turing logging levels up it seemed like this error is related to SASL and the SO advice was to turn it off via: propertynamehive.server2.authentication/namevalueNOSASL/value/property But I still have no luck: (this is the full command that gets run) java -cp /spark-1.1.0-bin-cdh4/conf:/spark-1.1.0-bin-cdh4/lib/spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-core-3.2.2.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-rdbms-3.2.1.jar:/a/shark/spark-1.1.0-bin-cdh4/lib/datanucleus-api-jdo-3.2.1.jar:/hadoop/share/hadoop/mapreduce1//conf -XX:MaxPermSize=128m-Xms4012m -Xmx4012m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://master-ip:7077 spark-internal --hiveconf hive.server.thrift.bind.host ip-to-bind 14/09/15 17:05:05 ERROR TThreadPoolServer: Error occurred during processing of message. java.lang.ClassCastException: org.apache.thrift.transport.TSocket cannot be cast to org.apache.thrift.transport.TSaslServerTransport at org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:53) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Any idea what might be going on? I compiled w/ -Phive against the 1.1.0. hive-site.conf is the conf file we used previously. SparkSQL does work for me but does not have a lot of functionality I need. Any help appreciated -- I do acklnowledge this is likely more of a Hive question than spark...If there is a precompiled version of CDH4 that includes thrift-server I'd be happy to try that too... thanks again.
Re: Serving data
If your dashboard is doing ajax/pull requests against say a REST API you can always create a Spark context in your rest service and use SparkSQL to query over the parquet files. The parquet files are already on disk so it seems silly to write both to parquet and to a DB...unless I'm missing something in your setup. On Tue, Sep 16, 2014 at 4:18 AM, Marius Soutier mps@gmail.com wrote: Writing to Parquet and querying the result via SparkSQL works great (except for some strange SQL parser errors). However the problem remains, how do I get that data back to a dashboard. So I guess I’ll have to use a database after all. You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe.
Re: persistent state for spark streaming
I don't think persist is meant for end-user usage. You might want to call saveAsTextFiles, for example, if you're saving to the file system as strings. You can also dump the DStream to a DB -- there are samples on this list (you'd have to do a combo of foreachRDD and mapPartitions, likely) On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi, My application is to digest user logs and deduct user quotas. I need to maintain latest states of user quotas persistently, so that latest user quotas will not be lost. I have tried *updateStateByKey* to generate and a DStream for user quotas and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it didn't work. Are there better approaches to persist states for spark streaming? Thanks.
Re: persistent state for spark streaming
Yes -- persist is more akin to caching -- it's telling Spark to materialize that RDD for fast reuse but it's not meant for the end user to query/use across processes, etc.(at least that's my understanding). On Thu, Oct 2, 2014 at 4:04 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi Yana, So, user quotas need another data store, which can guarantee persistence and afford frequent data updates/access. Is it correct? Thanks, Chia-Chun 2014-10-01 21:48 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: I don't think persist is meant for end-user usage. You might want to call saveAsTextFiles, for example, if you're saving to the file system as strings. You can also dump the DStream to a DB -- there are samples on this list (you'd have to do a combo of foreachRDD and mapPartitions, likely) On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi, My application is to digest user logs and deduct user quotas. I need to maintain latest states of user quotas persistently, so that latest user quotas will not be lost. I have tried *updateStateByKey* to generate and a DStream for user quotas and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it didn't work. Are there better approaches to persist states for spark streaming? Thanks.
[SparkSQL] Function parity with Shark?
Hi, in an effort to migrate off of Shark I recently tried the Thrift JDBC server that comes with Spark 1.1.0. However I observed that conditional functions do not work (I tried 'case' and 'coalesce') some string functions like 'concat' also did not work. Is there a list of what's missing or a roadmap of when it will be added? (I know percentiles are pending, for example but do not see JIRAs for the others in this email).
Re: [SparkSQL] Function parity with Shark?
) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Let me know if any of these warrant a JIRA thanks On Thu, Oct 2, 2014 at 2:00 PM, Michael Armbrust mich...@databricks.com wrote: What are the errors you are seeing. All of those functions should work. On Thu, Oct 2, 2014 at 6:56 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, in an effort to migrate off of Shark I recently tried the Thrift JDBC server that comes with Spark 1.1.0. However I observed that conditional functions do not work (I tried 'case' and 'coalesce') some string functions like 'concat' also did not work. Is there a list of what's missing or a roadmap of when it will be added? (I know percentiles are pending, for example but do not see JIRAs for the others in this email).
Re: Akka connection refused when running standalone Scala app on Spark 0.9.2
when you're running spark-shell and the example, are you actually specifying --master spark://master:7077 as shown here: http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark because if you're not, your spark-shell is running in local mode and not actually connecting to the cluster. Also, if you run spark-shell against the cluster, you'll see it listed under the Running applications in the master UI. It would be pretty odd for spark shell to connect successfully to the cluster but for your app to not connect...(which is why I suspect that you're running spark-shell local) Another thing to check, the executors need to connect back to your driver, so it could be that you have to set the driver host or driver port...in fact looking at your executor log, this seems fairly likely: is host1/xxx.xx.xx.xx:45542 the machine where your driver is running? is that host/port reachable from the worker machines? On Fri, Oct 3, 2014 at 5:32 AM, Irina Fedulova fedul...@gmail.com wrote: Hi, I have set up Spark 0.9.2 standalone cluster using CDH5 and pre-built spark distribution archive for Hadoop 2. I was not using spark-ec2 scripts because I am not on EC2 cloud. Spark-shell seems to be working properly -- I am able to perform simple RDD operations, as well as e.g. SparkPi standalone example works well when run via `run-example`. Web UI shows all workers connected. However, standalone Scala application gets connection refused messages. I think this has something to do with configuration, because spark-shell and SparkPi works well. I verified that .setMaster and .setSparkHome are properly assigned within scala app. Is there anything else in configuration of standalone scala app on spark that I am missing? I would very much appreciate any clues. Namely, I am trying to run MovieLensALS.scala example from AMPCamp big data mini course (http://ampcamp.berkeley.edu/big-data-mini-course/movie- recommendation-with-mllib.html). Here is error which I get when try to run compiled jar: --- root@master:~/machine-learning/scala# sbt/sbt package run /movielens/medium Launching sbt from sbt/sbt-launch-0.12.4.jar [info] Loading project definition from /root/training/machine- learning/scala/project [info] Set current project to movielens-als (in build file:/root/training/machine-learning/scala/) [info] Compiling 1 Scala source to /root/training/machine- learning/scala/target/scala-2.10/classes... [warn] there were 2 deprecation warning(s); re-run with -deprecation for details [warn] one warning found [info] Packaging /root/training/machine-learning/scala/target/scala-2.10/movielens-als_2.10-0.0.jar ... [info] Done packaging. [success] Total time: 6 s, completed Oct 2, 2014 1:19:00 PM [info] Running MovieLensALS /movielens/medium master = spark://master:7077 log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/10/02 13:19:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable HERE THERE 14/10/02 13:19:02 INFO FileInputFormat: Total input paths to process : 1 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 0 on host2: remote Akka client disassociated 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 1 (task 0.0:1) 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 0 (task 0.0:0) 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 4 on host5: remote Akka client disassociated 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 3 (task 0.0:1) 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 1 on host4: remote Akka client disassociated 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 2 (task 0.0:0) 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 4 (task 0.0:1) 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 3 on host3: remote Akka client disassociated 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 6 (task 0.0:0) 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 2 on host1: remote Akka client disassociated 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 5 (task 0.0:1) 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 7 (task 0.0:0) 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 6 on host4: remote Akka client disassociated 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 8 (task 0.0:0) 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 9 (task 0.0:1) 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 5 on host2: remote Akka client disassociated 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 10 (task 0.0:1) 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 7 on host5: remote Akka client disassociated 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 11 (task 0.0:0) 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 12 (task 0.0:1) 14/10/02
Re: Akka connection refused when running standalone Scala app on Spark 0.9.2
I don't think it's a red herring... (btw. spark.driver.host needs to be set to the IP or FQDN of the machine where you're running the program). I am running 0.9.2 on CDH4 and the beginning of my executor log looks like below (I've obfuscated the IP -- this is the log from executor a100-2-200-245). My driver is running on a100-2-200-238. I am not specifically setting spark.driver.host or the port but depending on how your machine is setup you might need to: SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/10/03 18:14:48 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/03 18:14:48 INFO Remoting: Starting remoting 14/10/03 18:14:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@a100-2-200-245:56760] 14/10/03 18:14:48 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@a100-2-200-245:56760] **14/10/03 18:14:48 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@a100-2-200-238:61505/user/CoarseGrainedScheduler** 14/10/03 18:14:48 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@a100-2-200-245:48067/user/Worker 14/10/03 18:14:48 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@a100-2-200-245:48067/user/Worker **14/10/03 18:14:49 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver** 14/10/03 18:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/03 18:14:49 INFO Remoting: Starting remoting If you look at the lines with ** this is where the driver successfully connects and at this point you should see your app show up in the UI under Running applications...The worker log you're posting -- is that the log that stored under work/app-id/executor-id/stderr? The first line you show in that log is INFO worker.Worker: Executor app-20141002131901-0002/9 finished with state FAILED but I imagine something prior to that would say why the executor failed? On Fri, Oct 3, 2014 at 2:56 PM, Irina Fedulova fedul...@gmail.com wrote: Yana, many thanks for looking into this! I am not running spark-shell in local mode, I am really starting spark-shell with --master spark://master:7077 and run in cluster mode. Second thing is I tried to set spark.driver.host to master both in scala app when creating context, and in conf/spark-defaults.conf file, but this did not make any difference. Worker logs still have same messages: 14/10/03 13:37:30 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkWorker@host2:51414] - [akka.tcp://sparkExecutor@host2:53851]: Error [Association failed with [akka.tcp://sparkExecutor@host2:53851]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@host2:53851] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: host2/xxx.xx.xx.xx:53851 ] note that host1, host2 etc are slave hostnames, and each slave has error message about itself: host1:some random port cannot connect to host1:some random port. However I noticed that after running successfully SparkPi app log also is populated with similar connection refused messages, but this does not lead to application death... So these worker logs are probably a false clue. On 03.10.14 19:37, Yana Kadiyska wrote: when you're running spark-shell and the example, are you actually specifying --master spark://master:7077 as shown here: http://spark.apache.org/docs/latest/programming-guide.html# initializing-spark because if you're not, your spark-shell is running in local mode and not actually connecting to the cluster. Also, if you run spark-shell against the cluster, you'll see it listed under the Running applications in the master UI. It would be pretty odd for spark shell to connect successfully to the cluster but for your app to not connect...(which is why I suspect that you're running spark-shell local) Another thing to check, the executors need to connect back to your driver, so it could be that you have to set the driver host or driver port...in fact looking at your executor log, this seems fairly likely: is host1/xxx.xx.xx.xx:45542 the machine where your driver is running? is that host/port reachable from the worker machines? On Fri, Oct 3, 2014 at 5:32 AM, Irina Fedulova fedul...@gmail.com mailto:fedul...@gmail.com wrote: Hi, I have set up Spark 0.9.2 standalone cluster using CDH5 and pre-built spark distribution archive for Hadoop 2. I was not using spark-ec2 scripts because I am not on EC2 cloud. Spark-shell seems to be working properly -- I am able to perform simple RDD operations, as well as e.g. SparkPi standalone example works well when run via `run-example`. Web UI shows all workers connected. However, standalone Scala application gets connection refused messages. I think this has something to do with configuration, because spark-shell
Re: [SparkSQL] Function parity with Shark?
I have created https://issues.apache.org/jira/browse/SPARK-3814 https://issues.apache.org/jira/browse/SPARK-3815 Will probably try my hand at 3814, seems like a good place to get started... On Fri, Oct 3, 2014 at 3:06 PM, Michael Armbrust mich...@databricks.com wrote: Thanks for digging in! These both look like they should have JIRAs. On Fri, Oct 3, 2014 at 8:14 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Thanks -- it does appear that I misdiagnosed a bit: case works generally but it doesn't seem to like the bit operation, which does not seem to work (type of bit_field in Hive is bigint): Error: java.lang.RuntimeException: Unsupported language features in query: select (case when bit_field 1=1 then r_end - r_start else NULL end) from mytable where pkey='0178-2014-07' LIMIT 2 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME mytable TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_FUNCTION when = TOK_TABLE_OR_COL bit_field 1 1 - TOK_TABLE_OR_COL r_end TOK_TABLE_OR_COL r_start TOK_NULL TOK_WHERE = TOK_TABLE_OR_COL pkey '0178-2014-07' TOK_LIMIT 2 SQLState: null ErrorCode: 0 similarly, concat seems to work but I get a failure in this query (due to LPAD I believe) : select customer_id from mytable where pkey=concat_ws('-',LPAD('077',4,'0'),'2014-07') LIMIT 2 (there is something going on with the fact that the function is in the where clausethe following work fine: select concat_ws('-', LPAD(cast(112717 % 1024 AS STRING),4,'0'),'2014-07') from mytable where pkey='0077-2014-07' LIMIT 2 select customer_id from mytable where pkey=concat_ws('-','0077','2014-07') LIMIT 2 ) 14/10/03 14:51:35 ERROR server.SparkSQLOperationManager: Error executing query: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.execution.Limit.execute(basicOperators.scala:146) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager$$anon$1.run(SparkSQLOperationManager.scala:185) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:193) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:175) at org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:150) at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:207) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1133) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1118) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:58) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:55) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:526) at org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:55) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.io.NotSerializableException: java.lang.reflect.Constructor at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508
Re: Help with using combineByKey
If you just want the ratio of positive to all values per key (if I'm reading right) this works val reduced= input.groupByKey().map(grp= grp._2.filter(v=v0).size.toFloat/grp._2.size) reduced.foreach(println) I don't think you need reduceByKey or combineByKey as you're not doing anything where the values depend on each other -- you're just counting... On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7), (SFO,0), (SFO,1), (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9), (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0))) val opPart1 = input.combineByKey( (v) = (v, 1), (var acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) = (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends ReducerText,IntWritable,Text,FloatWritable { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, IterableIntWritable values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Problem executing Spark via JBoss application
From this line : Removing executor app-20141015142644-0125/0 because it is EXITED I would guess that you need to examine the executor log to see why the executor actually exited. My guess would be that the executor cannot connect back to your driver. But check the log from the executor. It should be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I believe. On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.be wrote: Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:259) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:275) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:161) [jbossweb-7.0.13.Final.jar:]
Re: Exception Logging
you can out a try catch block in the map function and log the exception. The only tricky part is that the exception log will be located in the executor machine. Even if you don't do any trapping you should see the exception stacktrace in the executors' stderr log which is visible through the UI (if your app crashes the executor you can still see it as the last executor that ran on a given worker). But things like println and logging work inside map, you just have to remember everything happens on the remote machine On Thu, Oct 16, 2014 at 8:11 PM, Ge, Yao (Y.) y...@ford.com wrote: I need help to better trap Exception in the map functions. What is the best way to catch the exception and provide some helpful diagnostic information such as source of the input such as file name (and ideally line number if I am processing a text file)? -Yao
Re: create a Row Matrix
This works for me import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix val v1=Vectors.dense(Array(1d,2d)) val v2=Vectors.dense(Array(3d,4d)) val rows=sc.parallelize(List(v1,v2)) val mat=new RowMatrix(rows) val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(2, computeU = true) On Wed, Oct 22, 2014 at 1:55 AM, viola viola.wiersc...@siemens.com wrote: Thanks for the quick response. However, I still only get error messages. I am able to load a .txt file with entries in it and use it in sparks, but I am not able to create a simple matrix, for instance a 2x2 row matrix [1 2 3 4] I tried variations such as val RowMatrix = Matrix(2,2,array(1,3,2,4)) but it doesn't work.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/create-a-Row-Matrix-tp16913p16993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem packing spark-assembly jar
Hi folks, I'm trying to deploy the latest from master branch and having some trouble with the assembly jar. In the spark-1.1 official distribution(I use cdh version), I see the following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar contains a ton of stuff: datanucleus-api-jdo-3.2.1.jar datanucleus-core-3.2.2.jar datanucleus-rdbms-3.2.1.jar spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-hive-thriftserver_2.10-1.1.0.jar spark-hive_2.10-1.1.0.jar spark-sql_2.10-1.1.0.jar I tried to create a similar distribution off of master running mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist package and ./make-distribution.sh -Pbigtop-dist -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests but in either case all I get in spark-assembly is near empty: spark_official/dist/lib$ jar -tvf spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar META-INF/ META-INF/MANIFEST.MF org/ org/apache/ org/apache/spark/ org/apache/spark/unused/ org/apache/spark/unused/UnusedStubClass.class META-INF/maven/ META-INF/maven/org.spark-project.spark/ META-INF/maven/org.spark-project.spark/unused/ META-INF/maven/org.spark-project.spark/unused/pom.xml META-INF/maven/org.spark-project.spark/unused/pom.properties META-INF/NOTICE Any advice on how to get spark-core and the rest packaged into the assembly jar -- I'd like to have fewer things to copy around.
Re: Problem packing spark-assembly jar
thanks -- that was it. I could swear this had worked for me before and indeed it's fixed this morning. On Fri, Oct 24, 2014 at 6:34 AM, Sean Owen so...@cloudera.com wrote: I imagine this is a side effect of the change that was just reverted, related to publishing the effective pom? sounds related but I don't know. On Fri, Oct 24, 2014 at 2:22 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, I'm trying to deploy the latest from master branch and having some trouble with the assembly jar. In the spark-1.1 official distribution(I use cdh version), I see the following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar contains a ton of stuff: datanucleus-api-jdo-3.2.1.jar datanucleus-core-3.2.2.jar datanucleus-rdbms-3.2.1.jar spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-hive-thriftserver_2.10-1.1.0.jar spark-hive_2.10-1.1.0.jar spark-sql_2.10-1.1.0.jar I tried to create a similar distribution off of master running mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist package and ./make-distribution.sh -Pbigtop-dist -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests but in either case all I get in spark-assembly is near empty: spark_official/dist/lib$ jar -tvf spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar META-INF/ META-INF/MANIFEST.MF org/ org/apache/ org/apache/spark/ org/apache/spark/unused/ org/apache/spark/unused/UnusedStubClass.class META-INF/maven/ META-INF/maven/org.spark-project.spark/ META-INF/maven/org.spark-project.spark/unused/ META-INF/maven/org.spark-project.spark/unused/pom.xml META-INF/maven/org.spark-project.spark/unused/pom.properties META-INF/NOTICE Any advice on how to get spark-core and the rest packaged into the assembly jar -- I'd like to have fewer things to copy around.
[Spark SQL] Setting variables
Hi all, I'm trying to set a pool for a JDBC session. I'm connecting to the thrift server via JDBC client. My installation appears to be good(queries run fine), I can see the pools in the UI, but any attempt to set a variable (I tried spark.sql.shuffle.partitions and spark.sql.thriftserver.scheduler.pool) result in the exception below (trace is from Thriftserver log) Any thoughts on what I'm doing wrong? (I am on master, built today) SET spark.sql.thriftserver.scheduler.pool=mypool;select count(*) from mytable; == 14/10/24 18:17:10 ERROR server.SparkSQLOperationManager: Error executing query: java.lang.NullPointerException at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:244) at org.apache.spark.sql.execution.SetCommand.sideEffectResult$lzycompute(commands.scala:64) at org.apache.spark.sql.execution.SetCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:44) at org.apache.spark.sql.execution.SetCommand.execute(commands.scala:51) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:357) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:357) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:104) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:99) at org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager$$anon$1.run(SparkSQLOperationManager.scala:172) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:193) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:175) at org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:150) at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:207) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1133) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1118) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:58) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:55) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:526) at org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:55) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Re: scalac crash when compiling DataTypeConversions.scala
guoxu1231, I struggled with the Idea problem for a full week. Same thing -- clean builds under MVN/Sbt, but no luck with IDEA. What worked for me was the solution posted higher up in this thread -- it's a SO post that basically says to delete all iml files anywhere under the project directory. Let me know if you can't see this mail and I'll locate the exact SO post On Mon, Oct 27, 2014 at 5:15 AM, guoxu1231 guoxu1...@gmail.com wrote: Hi Stephen, I tried it again, To avoid the profile impact, I execute mvn -DskipTests clean package with Hadoop 1.0.4 by default and open the IDEA and import it as a maven project, and I didn't choose any profile in the import wizard. Then Make project or re-build project in IDEA, unfortunately the DataTypeConversions.scala compile failed agian. Any updated guide for using With IntelliJ IDEA? I'm following the Building Spark with Maven in website. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scalac-crash-when-compiling-DataTypeConversions-scala-tp17083p17333.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with start-slaves.sh
I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
Re: CANNOT FIND ADDRESS
CANNOT FIND ADDRESS occurs when your executor has crashed. I'll look further down where it shows each task and see if you see any tasks failed. Then you can examine the error log of that executor and see why it died. On Wed, Oct 29, 2014 at 9:35 AM, akhandeshi ami.khande...@gmail.com wrote: SparkApplication UI shows that one of the executor Cannot find Addresss Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 mddworker1.c.fi-mdd-poc.internal:42197 0 ms0 0 0 0.0 B 136.1 MB184.9 MB 146.8 GB135.4 MB 1 CANNOT FIND ADDRESS 0 ms0 0 0 0.0 B 87.4 MB 142.0 MB61.4 GB 81.4 MB I also see following in one of the executor logs for which the driver may have lost communication. 14/10/29 13:18:33 WARN : Master_Client Heartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 5000 14/10/29 13:18:33 WARN : WorkerClientToWorkerHeartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 1000 14/10/29 13:18:33 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362) I have also seen other variation of timeouts 14/10/29 06:21:05 WARN SendingConnection: Error finishing connection to mddworker1.c.fi-mdd-poc.internal/10.240.179.241:40442 java.net.ConnectException: Connection refused 14/10/29 06:21:05 ERROR BlockManager: Failed to report broadcast_6_piece0 to master; giving up. or 14/10/29 07:23:40 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:218) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:58) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:310) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:190) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:188) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at org.apache.spark.util.TimeStampedHashMap.foreach(TimeStampedHashMap.scala:107) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.storage.BlockManager.reportAllBlocks(BlockManager.scala:188) at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:207) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:366) How do I track down what is causing this problem? Any suggestion on solution, debugging or workaround will be helpful! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with start-slaves.sh
Roberto, I don't think shark is an issue -- I have shark server running on a node that also acts as a worker. What you can do is turn off shark server, just run start-all to start your spark cluster. then you can try bin/spark-shell --master yourmasterip and see if you can successfully run some hello world stuff. This will verify you have a working Spark cluster. Shark is just an application on top of it, so I can't imagine that's what's causing interference. But stopping it is the simplest way to check. On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: hi Yana, in my case I did not start any spark worker. However, shark was definitely running. Do you think that might be a problem? I will take a look Thank you, -- *From:* Yana Kadiyska [yana.kadiy...@gmail.com] *Sent:* Wednesday, October 29, 2014 9:45 AM *To:* Pagliari, Roberto *Cc:* user@spark.apache.org *Subject:* Re: problem with start-slaves.sh I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2
Adrian, do you know if this is documented somewhere? I was also under the impression that setting a key's value to None would cause the key to be discarded (without any explicit filtering on the user's part) but can not find any official documentation to that effect On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu amoc...@verticalscope.com wrote: My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever. -Original Message- From: spr [mailto:s...@yarcdata.com] Sent: November-12-14 2:25 PM To: u...@spark.incubator.apache.org Subject: Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2 After comparing with previous code, I got it work by making the return a Some instead of Tuple2. Perhaps some day I will understand this. spr wrote --code val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) = { val currentCount = if (values.isEmpty) 0 else values.map( x = x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x = x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) // (currentCount + previousCount, Seq(minTime, newMinTime).min) == old Some(currentCount + previousCount, Seq(minTime, newMinTime).min) // == new } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL] HiveThriftServer2 failure detection
Hi all, I am running HiveThriftServer2 and noticed that the process stays up even though there is no driver connected to the Spark master. I started the server via sbin/start-thriftserver and my namenodes are currently not operational. I can see from the log that there was an error on startup: 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2 and that the driver shut down as expected: 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext However, when I try to run start-thriftserver.sh again I see an error message that the process is already running and indeed there is a process with that PID: root 32334 1 0 16:32 ?00:00:00 /usr/local/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf hive.root.logger=INFO,console Is this a bug or design decision -- I am upgrading from Shark and we had scripts that monitor the driver and restart on failure. Here it seems that we would not be able to restart even though the driver died?
Re: [SQL] HiveThriftServer2 failure detection
https://issues.apache.org/jira/browse/SPARK-4497 On Wed, Nov 19, 2014 at 1:48 PM, Michael Armbrust mich...@databricks.com wrote: This is not by design. Can you please file a JIRA? On Wed, Nov 19, 2014 at 9:19 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi all, I am running HiveThriftServer2 and noticed that the process stays up even though there is no driver connected to the Spark master. I started the server via sbin/start-thriftserver and my namenodes are currently not operational. I can see from the log that there was an error on startup: 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2 and that the driver shut down as expected: 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext However, when I try to run start-thriftserver.sh again I see an error message that the process is already running and indeed there is a process with that PID: root 32334 1 0 16:32 ?00:00:00 /usr/local/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf hive.root.logger=INFO,console Is this a bug or design decision -- I am upgrading from Shark and we had scripts that monitor the driver and restart on failure. Here it seems that we would not be able to restart even though the driver died?
[SQL]Proper use of spark.sql.thriftserver.scheduler.pool
Hi sparkers, I'm trying to use spark.sql.thriftserver.scheduler.pool for the first time (earlier I was stuck because of https://issues.apache.org/jira/browse/SPARK-4037) I have two pools setup: [image: Inline image 1] and would like to issue a query against the low priority pool. I am doing this (tried both from beeline and a different JDBC client, output below is from beeline): SET spark.sql.thriftserver.scheduler.pool=CRON; +-+ | | +-+ | spark.sql.thriftserver.scheduler.pool=CRON | +-+ 1 row selected (0.09 seconds) 1: jdbc:hive2://myip:10001 select count(*) from mytable; The query executes OK but does not execute against the CRON pool...Am I misusing this setting (my goal is to be able to allocate a large set of cores to Thriftserver but separate out to a low-priority pool some housekeeping tasks) Thanks for any tips.
[SQL] Wildcards in SQLContext.parquetFile?
Hi folks, I'm wondering if someone has successfully used wildcards with a parquetFile call? I saw this thread and it makes me think no? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCACA1tWLjcF-NtXj=pqpqm3xk4aj0jitxjhmdqbojj_ojybo...@mail.gmail.com%3E I have a set of parquet files that are partitioned by key. I'd like to issue a query to read in a subset of the files, based on a directory wildcard (the wildcard will be a little more specific than * but this is to show the issue): This call works fine: sc.textFile(hdfs:///warehouse/hive/*/*/*.parquet).first res4: String = PAR1? L??? ?\??? , ,a??aL0?xU???e?? but this doesn't scala val parquetFile = sqlContext.parquetFile(“hdfs:///warehouse/hive/*/*/*.parquet”).first java.io.FileNotFoundException: File hdfs://cdh4-14822-nn/warehouse/hive/*/*/*.parquet does not exist
Re: Cluster getting a null pointer error
does spark-submit with SparkPi and spark-examples.jar work? e.g. ./spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://xx.xx.xx.xx:7077 /path/to/examples.jar On Tue, Dec 9, 2014 at 6:58 PM, Eric Tanner eric.tan...@justenough.com wrote: I have set up a cluster on AWS and am trying a really simple hello world program as a test. The cluster was built using the ec2 scripts that come with Spark. Anyway, I have output the error message (using --verbose) below. The source code is further below that. Any help would be greatly appreciated. Thanks, Eric *Error code:* r...@ip-xx.xx.xx.xx ~]$ ./spark/bin/spark-submit --verbose --class com.je.test.Hello --master spark://xx.xx.xx.xx:7077 Hello-assembly-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Using properties file: /root/spark/conf/spark-defaults.conf Adding default property: spark.executor.memory=5929m Adding default property: spark.executor.extraClassPath=/root/ephemeral-hdfs/conf Adding default property: spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/ Using properties file: /root/spark/conf/spark-defaults.conf Adding default property: spark.executor.memory=5929m Adding default property: spark.executor.extraClassPath=/root/ephemeral-hdfs/conf Adding default property: spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/ Parsed arguments: master spark://xx.xx.xx.xx:7077 deployMode null executorMemory 5929m executorCores null totalExecutorCores null propertiesFile /root/spark/conf/spark-defaults.conf extraSparkPropertiesMap() driverMemorynull driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass com.je.test.Hello primaryResource file:/root/Hello-assembly-1.0.jar namecom.je.test.Hello childArgs [] jarsnull verbose true Default properties from /root/spark/conf/spark-defaults.conf: spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/ spark.executor.memory - 5929m spark.executor.extraClassPath - /root/ephemeral-hdfs/conf Using properties file: /root/spark/conf/spark-defaults.conf Adding default property: spark.executor.memory=5929m Adding default property: spark.executor.extraClassPath=/root/ephemeral-hdfs/conf Adding default property: spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/ Main class: com.je.test.Hello Arguments: System properties: spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/ spark.executor.memory - 5929m SPARK_SUBMIT - true spark.app.name - com.je.test.Hello spark.jars - file:/root/Hello-assembly-1.0.jar spark.executor.extraClassPath - /root/ephemeral-hdfs/conf spark.master - spark://xxx.xx.xx.xxx:7077 Classpath elements: file:/root/Hello-assembly-1.0.jar *Actual Error:* Exception in thread main java.lang.NullPointerException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Source Code:* package com.je.test import org.apache.spark.{SparkConf, SparkContext} class Hello { def main(args: Array[String]): Unit = { val conf = new SparkConf(true)//.set(spark.cassandra.connection.host, xxx.xx.xx.xxx) val sc = new SparkContext(spark://xxx.xx.xx.xxx:7077, Season, conf) println(Hello World) } }
Trouble with cache() and parquet
Hi folks, wondering if anyone has thoughts. Trying to create something akin to a materialized view (sqlContext is a HiveContext connected to external metastore): val last2HourRdd = sqlContext.sql(sselect * from mytable) //last2HourRdd.first prints out a org.apache.spark.sql.Row = [...] with valid data last2HourRdd.cache() //last2HourRdd.first now fails in an executor with the following: In the driver: 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 35, iphere, NODE_LOCAL, 2170 bytes) 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) on executor iphere: java.lang.ClassCastException (null) [duplicate 1] And in executor: 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 27) java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310) at org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168) at org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54) at org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275) at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141) at org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) Any thoughts on this? Not sure if using the external metastore for the inital pull is a problem or if I'm just hitting a bug.
Re: Trouble with cache() and parquet
I see -- they are the same in design but the difference comes from partitioned Hive tables: when the RDD is generated by querying an external Hive metastore, the partition is appended as part of the row, and shows up as part of the schema. Can you shed some light on why this is a problem: last2HourRdd.first -- works ok last2HourRdd.cache() last2HourRdd.first -- does not work The first call shows K+1 columns (and so does print schema, where K columns are from the backing parquet files and the K+1st is the partition inlined. My impression is that the second call to .first would just force the cache() call and dump out that RDD to disk (with all of it's K+1 columns and store the schema info, again with K+1 columns), and then just return a single entry. I am not sure why the fact that Hive metastore exposes an extra column over the raw parquet file is a problem since it does so both on the schema and in the data: last2HourRdd.schema.fields.length reports K+1, and so does last2HourRdd.first.length. I also tried calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before caching but it does not fix the issue. The only workaround I've come up with so far is to replace select * with a select list_of_columns. But I'd love to understand a little better why the cache call trips this scenario On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust mich...@databricks.com wrote: Have you checked to make sure the schema in the metastore matches the schema in the parquet file? One way to test would be to just use sqlContext.parquetFile(...) which infers the schema from the file instead of using the metastore. On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, wondering if anyone has thoughts. Trying to create something akin to a materialized view (sqlContext is a HiveContext connected to external metastore): val last2HourRdd = sqlContext.sql(sselect * from mytable) //last2HourRdd.first prints out a org.apache.spark.sql.Row = [...] with valid data last2HourRdd.cache() //last2HourRdd.first now fails in an executor with the following: In the driver: 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 35, iphere, NODE_LOCAL, 2170 bytes) 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) on executor iphere: java.lang.ClassCastException (null) [duplicate 1] And in executor: 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 27) java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310) at org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168) at org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54) at org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275) at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157
Re: Nabble mailing list mirror errors: This post has NOT been accepted by the mailing list yet
Since you mentioned this, I had a related quandry recently -- it also says that the forum archives *u...@spark.incubator.apache.org u...@spark.incubator.apache.org/* *d...@spark.incubator.apache.org d...@spark.incubator.apache.org *respectively, yet the Community page clearly says to email the @spark.apache.org list (but the nabble archive is linked right there too). IMO even putting a clear explanation at the top Posting here requires that you create an account via the UI. Your message will be sent to both spark.incubator.apache.org and spark.apache.org (if that is the case, i'm not sure which alias nabble posts get sent to) would make things a lot more clear. On Sat, Dec 13, 2014 at 5:05 PM, Josh Rosen rosenvi...@gmail.com wrote: I've noticed that several users are attempting to post messages to Spark's user / dev mailing lists using the Nabble web UI ( http://apache-spark-user-list.1001560.n3.nabble.com/). However, there are many posts in Nabble that are not posted to the Apache lists and are flagged with This post has NOT been accepted by the mailing list yet. errors. I suspect that the issue is that users are not completing the sign-up confirmation process ( http://apache-spark-user-list.1001560.n3.nabble.com/mailing_list/MailingListOptions.jtp?forum=1), which is preventing their emails from being accepted by the mailing list. I wanted to mention this issue to the Spark community to see whether there are any good solutions to address this. I have spoken to users who think that our mailing list is unresponsive / inactive because their un-posted messages haven't received any replies. - Josh
Re: Limit the # of columns in Spark Scala
Denny, I am not sure what exception you're observing but I've had luck with 2 things: val table = sc.textFile(hdfs://) You can try calling table.first here and you'll see the first line of the file. You can also do val debug = table.first.split(\t) which would give you an array and you can indeed verify that the array contains what you want in positions 167,119 and 200. In the case of large files with a random bad line I find wrapping the call within the map in try/catch very valuable -- you can dump out the whole line in the catch statement Lastly I would guess that you're getting a compile error and not a runtime error -- I believe c is an array of values so I think you want tabs.map(c = (c(167), c(110), c(200)) instead of tabs.map(c = (c._(167), c._(110), c._(200)) On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee denny.g@gmail.com wrote: Yes - that works great! Sorry for implying I couldn't. Was just more flummoxed that I couldn't make the Scala call work on its own. Will continue to debug ;-) On Sun, Dec 14, 2014 at 11:39 Michael Armbrust mich...@databricks.com wrote: BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) You can still apply the schema programmatically: http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions
if you're running the test via sbt you can examine the classpath that sbt uses for the test (show runtime:full-classpath or last run)-- I find this helps once too many includes and excludes interact. On Thu, Jan 22, 2015 at 3:50 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My test case uses org.apache.spark.streaming.TestSuiteBase val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided excludeAll( val sparkStreaming= org.apache.spark % spark-streaming_2.10 % 1.1.0-SNAPSHOT % provided excludeAll( val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA exclude(org.apache.cassandra, cassandra-all) exclude(org.apache.cassandra, cassandra-thrift) val casAll = org.apache.cassandra % cassandra-all % 2.0.3 intransitive() val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3 intransitive() val sparkStreamingFromKafka = org.apache.spark % spark-streaming-kafka_2.10 % 0.9.1 excludeAll( -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: January-22-15 11:39 AM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions NoSuchMethodError almost always means that you have compiled some code against one version of a library but are running against another. I wonder if you are including different versions of Spark in your project, or running against a cluster on an older version? On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I get this exception when I run a Spark test case on my local machine: An exception or error caused a run to abort: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; java.lang.NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; In my test case I have these Spark related imports imports: import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.TestSuiteBase import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -Adrian B CB [ X ܚX K K[XZ[ \ \ ][ X ܚX P \ ˘\ X K ܙ B ܈ Y ] [ۘ[[X[ K[XZ[ \ \ Z [ \ ˘\ X K ܙ B B - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Results never return to driver | Spark Custom Reader
It looks to me like your executor actually crashed and didn't just finish properly. Can you check the executor log? It is available in the UI, or on the worker machine, under $SPARK_HOME/work/ app-20150123155114-/6/stderr (unless you manually changed the work directory location but in that case I'd assume you know where to find the log) On Thu, Jan 22, 2015 at 10:54 PM, Harihar Nahak hna...@wynyardgroup.com wrote: Hi All, I wrote a custom reader to read a DB, and it is able to return key and value as expected but after it finished it never returned to driver here is output of worker log : 15/01/23 15:51:38 INFO worker.ExecutorRunner: Launch command: java -cp ::/usr/local/spark-1.2.0-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/hadoop/etc/hadoop -XX:MaxPermSize=128m -Dspark.driver.port=53484 -Xms1024M -Xmx1024M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@VM90:53484/user/CoarseGrainedScheduler 6 VM99 4 app-20150123155114- akka.tcp://sparkWorker@VM99:44826/user/Worker 15/01/23 15:51:47 INFO worker.Worker: Executor app-20150123155114-/6 finished with state EXITED message Command exited with code 1 exitStatus 1 15/01/23 15:51:47 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@VM99:57695] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/01/23 15:51:47 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40143.96.25.29%3A35065-4#-915179653] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 15/01/23 15:51:49 INFO worker.Worker: Asked to kill unknown executor app-20150123155114-/6 If someone noticed any clue to fixed that will really appreciate. - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Results-never-return-to-driver-Spark-Custom-Reader-tp21328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell has syntax error on windows.
https://issues.apache.org/jira/browse/SPARK-5389 I marked as minor since I also just discovered that I can run it under PowerShell just fine. Vladimir, feel free to change the bug if you're getting a different message or a more serious issue. On Fri, Jan 23, 2015 at 4:44 PM, Josh Rosen rosenvi...@gmail.com wrote: Do you mind filing a JIRA issue for this which includes the actual error message string that you saw? https://issues.apache.org/jira/browse/SPARK On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am not sure if you get the same exception as I do -- spark-shell2.cmd works fine for me. Windows 7 as well. I've never bothered looking to fix it as it seems spark-shell just calls spark-shell2 anyway... On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com wrote: I have a problem with running spark shell in windows 7. I made the following steps: 1. downloaded and installed Scala 2.11.5 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests clean package (in git bash) After installation tried to run spark-shell.cmd in cmd shell and it says there is a syntax error in file. What could I do to fix problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
If you're talking about filter pushdowns for parquet files this also has to be turned on explicitly. Try *spark.sql.parquet.**filterPushdown=true . *It's off by default On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang wangxy...@gmail.com wrote: Yes it works! But the filter can't pushdown!!! If custom parquetinputformat only implement the datasource API? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang wangxy...@gmail.com: Thanks yana! I will try it! 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com 写道: I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Xiaoyu Wang Date:01/16/2015 5:09 AM (GMT-05:00) To: user@spark.apache.org Subject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE '*com.a.MyParquetHiveSerDe*' STORED AS INPUTFORMAT '*com.a.MyParquetInputFormat*' OUTPUTFORMAT '*com.a.MyParquetOutputFormat*'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] *Not HiveTableScan*!!! *So it dosn't execute my custom inputformat!* Why? How can it execute my custom inputformat? Thanks!
Re: Why Parquet Predicate Pushdown doesn't work?
Just wondering if you've made any progress on this -- I'm having the same issue. My attempts to help myself are documented here http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJ4HpHFVKvdNgKes41DvuFY=+f_nTJ2_RT41+tadhNZx=bc...@mail.gmail.com%3E . I don't believe I have the value scattered through all blocks issue either as running with sc.hadoopConfiguration.set(parquet.task.side.metadata,false) shows a much smaller Input size for me and it is the exact same parquet files being scanned. On Thu, Jan 8, 2015 at 1:40 AM, Xuelin Cao xuelincao2...@gmail.com wrote: Yes, the problem is, I've turned the flag on. One possible reason for this is, the parquet file supports predicate pushdown by setting statistical min/max value of each column on parquet blocks. If in my test, the groupID=10113000 is scattered in all parquet blocks, then the predicate pushdown fails. But, I'm not quite sure about that. I don't know whether there is any other reason that can lead to this. On Wed, Jan 7, 2015 at 10:14 PM, Cody Koeninger c...@koeninger.org wrote: But Xuelin already posted in the original message that the code was using SET spark.sql.parquet.filterPushdown=true On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote: Quoting Michael: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration Daniel On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote: Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: *sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect* Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when *generating *the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Re: Issues with constants in Spark HiveQL queries
yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I just tried the CDH4 prebuilt...Here is what I get for the = token: [image: Inline image 1] The literal type shows as 290, not 291, and 290 is numeric. According to this http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser 291 is token PLUS which is really weird... On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote: The log showed it failed in parsing, so the typo stuff shouldn’t be the root cause. BUT I couldn’t reproduce that with master branch. I did the test as follow: sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] *Sent:* Wednesday, January 14, 2015 11:12 PM *To:* Pala M Muthaia *Cc:* user@spark.apache.org *Subject:* Re: Issues with constants in Spark HiveQL queries Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57
Re: Issues with constants in Spark HiveQL queries
Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at
Re: Using Spark SQL with multiple (avro) files
If the wildcard path you have doesn't work you should probably open a bug -- I had a similar problem with Parquet and it was a bug which recently got closed. Not sure if sqlContext.avroFile shares a codepath with .parquetFile...you can try running with bits that have the fix for .parquetFile or look at the source... Here was my question for reference: http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3ccaaswr-5rfmu-y-7htluj2eqqaecwjs8jh+irrzhm7g1ex7v...@mail.gmail.com%3E On Wed, Jan 14, 2015 at 4:34 AM, David Jones letsnumsperi...@gmail.com wrote: Hi, I have a program that loads a single avro file using spark SQL, queries it, transforms it and then outputs the data. The file is loaded with: val records = sqlContext.avroFile(filePath) val data = records.registerTempTable(data) ... Now I want to run it over tens of thousands of Avro files (all with schemas that contain the fields I'm interested in). Is it possible to load multiple avro files recursively from a top-level directory using wildcards? All my avro files are stored under s3://my-bucket/avros/*/DATE/*.avro, and I want to run my task across all of these on EMR. If that's not possible, is there some way to load multiple avro files into the same table/RDD so the whole dataset can be processed (and in that case I'd supply paths to each file concretely, but I *really* don't want to have to do that). Thanks David
Re: spark-shell has syntax error on windows.
I am not sure if you get the same exception as I do -- spark-shell2.cmd works fine for me. Windows 7 as well. I've never bothered looking to fix it as it seems spark-shell just calls spark-shell2 anyway... On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com wrote: I have a problem with running spark shell in windows 7. I made the following steps: 1. downloaded and installed Scala 2.11.5 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests clean package (in git bash) After installation tried to run spark-shell.cmd in cmd shell and it says there is a syntax error in file. What could I do to fix problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Thanks for looking Cheng. Just to clarify in case other people need this sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata, false)did work well in terms of dropping rowgroups/showing small input size. What was odd about that is that the overall time wasn't much better...but maybe that was overhead from sending the metadata clientside. Thanks again and looking forward to your fix On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Yana, Sorry for the late reply, missed this important thread somehow. And many thanks for reporting this. It turned out to be a bug — filter pushdown is only enabled when using client side metadata, which is not expected, because task side metadata code path is more performant. And I guess that the reason why setting parquet.task.side.metadata to false didn’t reduce input size for you is because you set the configuration with Spark API, or put it into spark-defaults.conf. This configuration goes to Hadoop Configuration, and Spark only merge properties whose names start with spark.hadoop into Hadoop Configuration instances. You may try to put parquet.task.side.metadata config into Hadoop core-site.xml, and then re-run the query. I can see significant differences by doing so. I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for reporting all the details! Cheng On 1/13/15 12:56 PM, Yana Kadiyska wrote: Attempting to bump this up in case someone can help out after all. I spent a few good hours stepping through the code today, so I'll summarize my observations both in hope I get some help and to help others that might be looking into this: 1. I am setting *spark.sql.parquet.**filterPushdown=true* 2. I can see by stepping through the driver debugger that PaquetTableOperations.execute sets the filters via ParquetInputFormat.setFilterPredicate (I checked the conf object, things appear OK there) 3. In FilteringParquetRowInputFormat, I get through the codepath for getTaskSideSplits. It seems that the codepath for getClientSideSplits would try to drop rowGroups but I don't see similar in getTaskSideSplit. Does anyone have pointers on where to look after this? Where is rowgroup filtering happening in the case of getTaskSideSplits? I can attach to the executor but am not quite sure what code related to Parquet gets called executor side...also don't see any messages in the executor logs related to Filtering predicates. For comparison, I went through the getClientSideSplits and can see that predicate pushdown works OK: sc.hadoopConfiguration.set(parquet.task.side.metadata,false) 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split Strategy 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800) 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do not pass filter predicate (28 %) ! Is it possible that this is just a UI bug? I can see Input=4G when using (parquet.task.side.metadata,false) and Input=140G when using (parquet.task.side.metadata,true) but the runtimes are very comparable? [image: Inline image 1] JobId 4 is the ClientSide split, JobId 5 is the TaskSide split. On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
Re: Installing Spark Standalone to a Cluster
You can do ./sbin/start-slave.sh --master spark://IP:PORT. I believe you're missing --master. In addition, it's a good idea to pass with --master exactly the spark master's endpoint as shown on your UI under http://localhost:8080. But that should do it. If that's not working, you can look at the Worker log and see where it's trying to connect to and if it's getting any errors. On Thu, Jan 22, 2015 at 12:06 PM, riginos samarasrigi...@gmail.com wrote: I have downloaded spark-1.2.0.tgz on each of my node and execute ./sbt/sbt assembly on each of them. So I execute. /sbin/start-master.sh on my master and ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT. Althought when I got to http://localhost:8080 I cannot see any worker. Why is that? Do I do something wrong with the installation deploy of the spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21319.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet predicate pushdown troubles
I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
Re: Getting Output From a Cluster
are you calling the saveAsText files on the DStream --looks like it? Look at the section called Design Patterns for using foreachRDD in the link you sent -- you want to do dstream.foreachRDD(rdd = rdd.saveAs) On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, Thanks in advance for the help! I successfully got my Kafka/Spark WordCount app to print locally. However, I want to run it on a cluster, which means that I will have to save it to HDFS if I want to be able to read the output. I am running Spark 1.1.0, which means according to this document: https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html I should be able to use commands such as saveAsText/HadoopFiles. 1) When I try saveAsTextFiles it says: cannot find symbol [ERROR] symbol : method saveAsTextFiles(java.lang.String,java.lang.String) [ERROR] location: class org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer This makes some sense as saveAsTextFiles is not included here: http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html 2) When I try saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount, txt) it builds, but when I try running it it throws this exception: Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) at org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073) ... 14 more Any help is really appreciated! Thanks.
textFile partitions
Hi folks, puzzled by something pretty simple: I have a standalone cluster with default parallelism of 2, spark-shell running with 2 cores sc.textFile(README.md).partitions.size returns 2 (this makes sense) sc.textFile(README.md).coalesce(100,true).partitions.size returns 100, also makes sense but sc.textFile(README.md,100).partitions.size gives 102 --I was expecting this to be equivalent to last statement (i.e.result in 100 partitions) I'd appreciate if someone can enlighten me as to why I end up with 102 This is on Spark 1.2 thanks
Re: Saving partial (top 10) DStream windows to hdfs
I'm glad you solved this issue but have a followup question for you. Wouldn't Akhil's solution be better for you after all? I run similar computation where a large set of data gets reduced to a much smaller aggregate in an interval. If you do saveAsText without coalescing, I believe you'd get the same number of files as you have partitions. So in a worse case scenario, you'd end up with 10 partitions(if each item in your top 10 was in a different partition) and thus, 10 output files. Seems to me this would be horrible for subsequent processing of these (as in, this is small-files in HDFS at its worst). But even with coalesce 1 you'd get 1 pretty small file on every interval I'm curious what you think because everytime I come to a situation like this I end up using a store that supports appends (some sort of DB) but it's more code/work. So I'm curious on your experience of saving to files (unless you have a separate process to merge these chunks, of course) On Wed, Jan 7, 2015 at 11:56 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, It worked out as this. val topCounts = sortedCounts.transform(rdd = rdd.zipWithIndex().filter(x=x._2 =10)) Regards, Laeeq On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed laeeqsp...@yahoo.com.INVALID wrote: Hi Yana, I also think that *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))* will give top 10 from each partition. I will try your code. Regards, Laeeq On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: My understanding is that *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))* would result in an RDD containing the top 10 entries per partition -- am I wrong? I am not sure if there is a more efficient way but I think this would work: *sortedCounts.*zipWithIndex().filter(x=x._2 =10).saveAsText On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I applied it as fallows: eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) val counts = eegStreams(a).map(x = math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap) val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10)) *//val topCounts = sortedCounts.transform(rdd = ssc.sparkContext.makeRDD(rdd.take(10)))* topCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs:// ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) topCounts.print() It gives the output with 10 extra values. I think it works on partition of each rdd rather than just rdd. I also tried the commented code. It gives correct result but in the start it gives serialisation error ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Output for code in red: The values in green looks extra to me. 0,578 -3,576 4,559 -1,556 3,553 -6,540 6,538 -4,535 1,526 10,483 *94,8* *-113,8* *-137,8* *-85,8* *-91,8* *-121,8* *114,8* *108,8* *93,8* *101,8* 1,128 -8,118 3,112 -4,110 -13,108 4,108 -3,107 -10,107 -6,106 8,105 *76,6* *74,6* *60,6* *52,6* *70,6* *71,6* *-60,6* *55,6* *78,5* *64,5* and so on. Regards, Laeeq On Tuesday, January 6, 2015 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try something like: *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))* Thanks Best Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* *val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* *val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)* *//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))* *sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))* I can print top 10 as above in red. I have also tried *sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} * but I get the following error. *15/01/05 17:12:23 ERROR actor.OneForOneStrategy
Re: Is it possible to use windows service to start and stop spark standalone cluster
You might also want to see if TaskScheduler helps with that. I have not used it with Windows 2008 R2 but it generally does allow you to schedule a bat file to run on startup On Wed, Mar 11, 2015 at 10:16 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Thanks for the suggestion. I will try that. Ningjun From: Silvio Fiorito [mailto:silvio.fior...@granturing.com] Sent: Wednesday, March 11, 2015 12:40 AM To: Wang, Ningjun (LNG-NPV); user@spark.apache.org Subject: Re: Is it possible to use windows service to start and stop spark standalone cluster Have you tried Apache Daemon? http://commons.apache.org/proper/commons-daemon/procrun.html From: Wang, Ningjun (LNG-NPV) Date: Tuesday, March 10, 2015 at 11:47 PM To: user@spark.apache.org Subject: Is it possible to use windows service to start and stop spark standalone cluster We are using spark stand alone cluster on Windows 2008 R2. I can start spark clusters by open an command prompt and run the following bin\spark-class.cmd org.apache.spark.deploy.master.Master bin\spark-class.cmd org.apache.spark.deploy.worker.Worker spark:// mywin.mydomain.com:7077 I can stop spark cluster by pressing Ctril-C. The problem is that if the machine is reboot, I have to manually start the spark cluster again as above. Is it possible to use a windows service to start cluster? This way when the machine is reboot, the windows service will automatically restart spark cluster. How to stop spark cluster using windows service is also a challenge. Please advise. Thanks Ningjun
Re: Errors in spark
I was actually just able to reproduce the issue. I do wonder if this is a bug -- the docs say When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory. But as you can see in from the message warehouse is not in the current directory, it is under /user/hive. In my case this directory was owned by 'root' and noone else had write permissions. Changing the permissions works if you need to get unblocked quickly...But it does seem like a bug to me... On Fri, Feb 27, 2015 at 11:21 AM, sandeep vura sandeepv...@gmail.com wrote: Hi yana, I have removed hive-site.xml from spark/conf directory but still getting the same errors. Anyother way to work around. Regards, Sandeep On Fri, Feb 27, 2015 at 9:38 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I think you're mixing two things: the docs say When* not *configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory.. AFAIK if you want a local metastore, you don't put hive-site.xml anywhere. You only need the file if you're going to point to an external metastore. If you're pointing to an external metastore, in my experience I've also had to copy core-site.xml into conf in order to specify this property: namefs.defaultFS/name On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am using hive version - hive 0.13 and copied hive-site.xml in spark/conf and using default derby local metastore . While creating a table in spark shell getting the following error ..Can any one please look and give solution asap.. sqlContext.sql(CREATE TABLE IF NOT EXISTS sandeep (key INT, value STRING)) 15/02/27 23:06:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse_1/sandeep is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy13.createTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613) at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line9.$read$$iwC$$iwC$$iwC
Re: Errors in spark
I think you're mixing two things: the docs say When* not *configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory.. AFAIK if you want a local metastore, you don't put hive-site.xml anywhere. You only need the file if you're going to point to an external metastore. If you're pointing to an external metastore, in my experience I've also had to copy core-site.xml into conf in order to specify this property: namefs.defaultFS/name On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am using hive version - hive 0.13 and copied hive-site.xml in spark/conf and using default derby local metastore . While creating a table in spark shell getting the following error ..Can any one please look and give solution asap.. sqlContext.sql(CREATE TABLE IF NOT EXISTS sandeep (key INT, value STRING)) 15/02/27 23:06:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse_1/sandeep is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy13.createTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613) at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15) at $line9.$read$$iwC$$iwC$$iwC.init(console:20) at $line9.$read$$iwC$$iwC.init(console:22) at $line9.$read$$iwC.init(console:24) at $line9.$read.init(console:26) at $line9.$read$.init(console:30) at $line9.$read$.clinit(console) at $line9.$eval$.init(console:7) at $line9.$eval$.clinit(console) at $line9.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at
Re: Running multiple threads with same Spark Context
I am not sure if your issue is setting the Fair mode correctly or something else so let's start with the FAIR mode. Do you see scheduler mode actually being set to FAIR: I have this line in spark-defaults.conf spark.scheduler.allocation.file=/spark/conf/fairscheduler.xml Then, when I start my application, I can see that it is using that scheduler in the UI -- go to master UI and click on your application. Then you should see this (note the scheduling mode is shown as Fair): On Wed, Feb 25, 2015 at 4:06 AM, Harika Matha matha.har...@gmail.com wrote: Hi Yana, I tried running the program after setting the property spark.scheduler.mode to FAIR. But the result is same as previous. Are there any other properties that have to be set? On Tue, Feb 24, 2015 at 10:26 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: It's hard to tell. I have not run this on EC2 but this worked for me: The only thing that I can think of is that the scheduling mode is set to - *Scheduling Mode:* FAIR val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) while_loop to get curr_job pool.execute(new ReportJob(sqlContext, curr_job, i)) class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging { def threadId = (Thread.currentThread.getName() + \t) def run() { logInfo(s* Running ${threadId} ${id}) val startTime = Platform.currentTime val hiveQuery=query val result_set = sqlContext.sql(hiveQuery) result_set.repartition(1) result_set.saveAsParquetFile(shdfs:///tmp/${id}) logInfo(s* DONE ${threadId} ${id} time: +(Platform.currentTime-startTime)) } } On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote: Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executor size and checkpoints
Hi all, I had a streaming application and midway through things decided to up the executor memory. I spent a long time launching like this: ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest --executor-memory 2G --master... and observing the executor memory is still at old 512 setting I was about to ask if this is a bug when I decided to delete the checkpoints. Sure enough the setting took after that. So my question is -- why is it required to remove checkpoints to increase memory allowed on an executor? This seems pretty un-intuitive to me. Thanks for any insights.
Re: Executor size and checkpoints
Tathagata, yes, I was using StreamingContext.getOrCreate. My question is about the design decision here. I was expecting that if I have a streaming application that say crashed, and I wanted to give the executors more memory, I would be able to restart, using the checkpointed RDD but with more memory. I thought deleting the checkpoints in a checkpointed application is the last thing that you want to do (as you lose all state). Seems a bit harsh to have to do this just to increase the amount of memory? On Mon, Feb 23, 2015 at 11:12 PM, Tathagata Das t...@databricks.com wrote: Hey Yana, I think you posted screenshots, but they are not visible in the email. Probably better to upload them and post links. Are you using StreamingContext.getOrCreate? If that is being used, then it will recreate the SparkContext with SparkConf having whatever configuration is present in the existing checkpoint files. It may so happen that the existing checkpoint files were from an old run which had 512 configured. So the SparkConf in the restarted SparkContext/StremingContext is accidentally picking up the old configuration. Deleting the checkpoint files avoided a restart, and the new config took affect. Maybe. :) TD On Sat, Feb 21, 2015 at 7:30 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi all, I had a streaming application and midway through things decided to up the executor memory. I spent a long time launching like this: ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest --executor-memory 2G --master... and observing the executor memory is still at old 512 setting I was about to ask if this is a bug when I decided to delete the checkpoints. Sure enough the setting took after that. So my question is -- why is it required to remove checkpoints to increase memory allowed on an executor? This seems pretty un-intuitive to me. Thanks for any insights.
[SparkSQL] Number of map tasks in SparkSQL
Shark used to have shark.map.tasks variable. Is there an equivalent for Spark SQL? We are trying a scenario with heavily partitioned Hive tables. We end up with a UnionRDD with a lot of partitions underneath and hence too many tasks: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L202 is there a good way to tell SQL to coalesce these? thanks for any pointers
Re: Running multiple threads with same Spark Context
It's hard to tell. I have not run this on EC2 but this worked for me: The only thing that I can think of is that the scheduling mode is set to - *Scheduling Mode:* FAIR val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) while_loop to get curr_job pool.execute(new ReportJob(sqlContext, curr_job, i)) class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging { def threadId = (Thread.currentThread.getName() + \t) def run() { logInfo(s* Running ${threadId} ${id}) val startTime = Platform.currentTime val hiveQuery=query val result_set = sqlContext.sql(hiveQuery) result_set.repartition(1) result_set.saveAsParquetFile(shdfs:///tmp/${id}) logInfo(s* DONE ${threadId} ${id} time: +(Platform.currentTime-startTime)) } } On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote: Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SparkSQL, Spark 1.2] UDFs in group by broken?
Can someone confirm if they can run UDFs in group by in spark1.2? I have two builds running -- one from a custom build from early December (commit 4259ca8dd12) which works fine, and Spark1.2-RC2. On the latter I get: jdbc:hive2://XXX.208:10001 select from_unixtime(epoch,'-MM-dd-HH'),count(*) count . . . . . . . . . . . . . . . . . . from tbl . . . . . . . . . . . . . . . . . . group by from_unixtime(epoch,'-MM-dd-HH'); Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression not in GROUP BY: HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH) AS _c0#1004, tree: Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH) AS _c0#1004,COUNT(1) AS count#1003L] MetastoreRelation default, tbl, None (state=,code=0) This worked fine on my older build. I don't see a JIRA on this but maybe I'm not looking right. Can someone please advise?
Re: Help me understand the partition, parallelism in Spark
Imran, I have also observed the phenomenon of reducing the cores helping with OOM. I wanted to ask this (hopefully without straying off topic): we can specify the number of cores and the executor memory. But we don't get to specify _how_ the cores are spread among executors. Is it possible that with 24G memory and 4 cores we get a spread of 1 core per executor thus ending up with 24G for the task, but with 24G memory and 10 cores some executor ends up with 3 cores on the same machine and thus we have only 8G per task? On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com wrote: Hi Yong, mostly correct except for: - Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. no, you will not get 1000 partitions. Spark has to decide how many partitions to use before it even knows how many unique keys there are. If you have 200 as the default parallelism (or you just explicitly make it the second parameter to reduceByKey()), then you will get 200 partitions. The 1000 unique keys will be distributed across the 200 partitions. ideally they will be distributed pretty equally, but how they get distributed depends on the partitioner (by default you will have a HashPartitioner, so it depends on the hash of your keys). Note that this is more or less the same as in Hadoop MapReduce. the amount of parallelism matters b/c there are various places in spark where there is some overhead proportional to the size of a partition. So in your example, if you have 1000 unique keys in 200 partitions, you expect about 5 unique keys per partitions -- if instead you had 10 partitions, you'd expect 100 unique keys per partitions, and thus more data and you'd be more likely to hit an OOM. But there are many other possible sources of OOM, so this is definitely not the *only* solution. Sorry I can't comment in particular about Spark SQL -- hopefully somebody more knowledgeable can comment on that. On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I come from the Hadoop MapReducer world, and try to understand some internal information of spark. From the web and this list, I keep seeing people talking about increase the parallelism if you get the OOM error. I tried to read document as much as possible to understand the RDD partition, and parallelism usage in the spark. I understand that for RDD from HDFS, by default, one partition will be one HDFS block, pretty straightforward. I saw that lots of RDD operations support 2nd parameter of parallelism. This is the part confuse me. From my understand, the parallelism is totally controlled by how many cores you give to your job. Adjust that parameter, or spark.default.parallelism shouldn't have any impact. For example, if I have a 10G data in HDFS, and assume the block size is 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 200 as the default parallelism. Here is what I assume: - We have 100 partitions, as the data comes from 100 blocks. Most likely the spark will generate 100 tasks to read and shuffle them? - The 1000 unique keys mean the 1000 reducer group, like in MR - If I set the max core to be 50, so there will be up to 50 tasks can be run concurrently. The rest tasks just have to wait for the core, if there are 50 tasks are running. - Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. - I don't know these 1000 partitions will be processed by how many tasks, maybe this is the parallelism parameter comes in? - No matter what parallelism this will be, there are ONLY 50 task can be run concurrently. So if we set more cores, more partitions' data will be processed in the executor (which runs more thread in this case), so more memory needs. I don't see how increasing parallelism could help the OOM in this case. - In my test case of Spark SQL, I gave 24G as the executor heap, my join between 2 big datasets keeps getting OOM. I keep increasing the spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no help. What really makes the query finish finally without OOM is after I change the --total-executor-cores from 10 to 4. So my questions are: 1) What is the parallelism really mean in the Spark? In the simple example above, for reduceByKey, what difference it is between parallelism change from 10 to 20? 2) When we talk about partition in the spark, for the data coming from HDFS, I can understand the partition clearly. For the intermediate data, the partition will be same as key, right? For group, reducing, join action, uniqueness of the keys will be partition. Is that correct? 3) Why increasing parallelism could help OOM?
Re: Help me understand the partition, parallelism in Spark
Yong, for the 200 tasks in stage 2 and 3 -- this actually comes from the shuffle setting: spark.sql.shuffle.partitions On Thu, Feb 26, 2015 at 5:51 PM, java8964 java8...@hotmail.com wrote: Imran, thanks for your explaining about the parallelism. That is very helpful. In my test case, I am only use one box cluster, with one executor. So if I put 10 cores, then 10 concurrent task will be run within this one executor, which will handle more data than 4 core case, then leaded to OOM. I haven't setup Spark on our production cluster yet, but assume that we have 100 nodes cluster, if I guess right, set up to 1000 cores mean that on average, each box's executor will run 10 threads to process data. So lowering core will reduce the speed of spark, but can help to avoid the OOM, as less data to be processed in the memory. My another guess is that each partition will be processed by one core eventually. So make bigger partition count can decrease partition size, which should help the memory footprint. In my case, I guess that Spark SQL in fact doesn't use the spark.default.parallelism setting, or at least in my query, it is not used. So no matter what I changed, it doesn't matter. The reason I said that is that there is always 200 tasks in stage 2 and 3 of my query job, no matter what I set the spark.default.parallelism. I think lowering the core is to exchange lower memory usage vs speed. Hope my understanding is correct. Thanks Yong -- Date: Thu, 26 Feb 2015 17:03:20 -0500 Subject: Re: Help me understand the partition, parallelism in Spark From: yana.kadiy...@gmail.com To: iras...@cloudera.com CC: java8...@hotmail.com; user@spark.apache.org Imran, I have also observed the phenomenon of reducing the cores helping with OOM. I wanted to ask this (hopefully without straying off topic): we can specify the number of cores and the executor memory. But we don't get to specify _how_ the cores are spread among executors. Is it possible that with 24G memory and 4 cores we get a spread of 1 core per executor thus ending up with 24G for the task, but with 24G memory and 10 cores some executor ends up with 3 cores on the same machine and thus we have only 8G per task? On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com wrote: Hi Yong, mostly correct except for: - Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. no, you will not get 1000 partitions. Spark has to decide how many partitions to use before it even knows how many unique keys there are. If you have 200 as the default parallelism (or you just explicitly make it the second parameter to reduceByKey()), then you will get 200 partitions. The 1000 unique keys will be distributed across the 200 partitions. ideally they will be distributed pretty equally, but how they get distributed depends on the partitioner (by default you will have a HashPartitioner, so it depends on the hash of your keys). Note that this is more or less the same as in Hadoop MapReduce. the amount of parallelism matters b/c there are various places in spark where there is some overhead proportional to the size of a partition. So in your example, if you have 1000 unique keys in 200 partitions, you expect about 5 unique keys per partitions -- if instead you had 10 partitions, you'd expect 100 unique keys per partitions, and thus more data and you'd be more likely to hit an OOM. But there are many other possible sources of OOM, so this is definitely not the *only* solution. Sorry I can't comment in particular about Spark SQL -- hopefully somebody more knowledgeable can comment on that. On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I come from the Hadoop MapReducer world, and try to understand some internal information of spark. From the web and this list, I keep seeing people talking about increase the parallelism if you get the OOM error. I tried to read document as much as possible to understand the RDD partition, and parallelism usage in the spark. I understand that for RDD from HDFS, by default, one partition will be one HDFS block, pretty straightforward. I saw that lots of RDD operations support 2nd parameter of parallelism. This is the part confuse me. From my understand, the parallelism is totally controlled by how many cores you give to your job. Adjust that parameter, or spark.default.parallelism shouldn't have any impact. For example, if I have a 10G data in HDFS, and assume the block size is 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 200 as the default parallelism. Here is what I assume: - We have 100 partitions, as the data comes from 100 blocks. Most likely the spark will generate 100 tasks to
[SparkSQL] Try2: Parquet predicate pushdown troubles
Attempting to bump this up in case someone can help out after all. I spent a few good hours stepping through the code today, so I'll summarize my observations both in hope I get some help and to help others that might be looking into this: 1. I am setting *spark.sql.parquet.**filterPushdown=true* 2. I can see by stepping through the driver debugger that PaquetTableOperations.execute sets the filters via ParquetInputFormat.setFilterPredicate (I checked the conf object, things appear OK there) 3. In FilteringParquetRowInputFormat, I get through the codepath for getTaskSideSplits. It seems that the codepath for getClientSideSplits would try to drop rowGroups but I don't see similar in getTaskSideSplit. Does anyone have pointers on where to look after this? Where is rowgroup filtering happening in the case of getTaskSideSplits? I can attach to the executor but am not quite sure what code related to Parquet gets called executor side...also don't see any messages in the executor logs related to Filtering predicates. For comparison, I went through the getClientSideSplits and can see that predicate pushdown works OK: sc.hadoopConfiguration.set(parquet.task.side.metadata,false) 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split Strategy 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800) 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do not pass filter predicate (28 %) ! Is it possible that this is just a UI bug? I can see Input=4G when using (parquet.task.side.metadata,false) and Input=140G when using (parquet.task.side.metadata,true) but the runtimes are very comparable? [image: Inline image 1] JobId 4 is the ClientSide split, JobId 5 is the TaskSide split. On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
[SQL] Simple DataFrame questions
Hi folks, having some seemingly noob issues with the dataframe API. I have a DF which came from the csv package. 1. What would be an easy way to cast a column to a given type -- my DF columns are all typed as strings coming from a csv. I see a schema getter but not setter on DF 2. I am trying to use the syntax used in various blog posts but can't figure out how to reference a column by name: scala df.filter(customer_id!=) console:23: error: overloaded method value filter with alternatives: (conditionExpr: String)org.apache.spark.sql.DataFrame and (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be applied to (Boolean) df.filter(customer_id!=) 3. what would be the recommended way to drop a row containing a null value -- is it possible to do this: scala df.filter(customer_id IS NOT NULL)
Escaping user input for Hive queries
Hi folks, we have been using the a JDBC connection to Spark's Thrift Server so far and using JDBC prepared statements to escape potentially malicious user input. I am trying to port our code directly to HiveContext now (i.e. eliminate the use of Thrift Server) and I am not quite sure how to generate a properly escaped sql statement... Wondering if someone has ideas on proper way to do this? To be concrete, I would love to issue this statement val df = myHiveCtxt.(sqlText) but I would like to defend against potential SQL injection.
Re: Spark 1.3.1 and Parquet Partitions
Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-3928 Looks like for now you'd have to list the full paths...I don't see a comment from an official spark committer so still not sure if this is a bug or design, but it seems to be the current state of affairs. On Thu, May 7, 2015 at 8:43 AM, yana yana.kadiy...@gmail.com wrote: I believe this is a regression. Does not work for me either. There is a Jira on parquet wildcards which is resolved, I'll see about getting it reopened Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Vaxuki Date:05/07/2015 7:38 AM (GMT-05:00) To: Olivier Girardot Cc: user@spark.apache.org Subject: Re: Spark 1.3.1 and Parquet Partitions Olivier Nope. Wildcard extensions don't work I am debugging the code to figure out what's wrong I know I am using 1.3.1 for sure Pardon typos... On May 7, 2015, at 7:06 AM, Olivier Girardot ssab...@gmail.com wrote: hdfs://some ip:8029/dataset/*/*.parquet doesn't work for you ? Le jeu. 7 mai 2015 à 03:32, vasuki vax...@gmail.com a écrit : Spark 1.3.1 - i have a parquet file on hdfs partitioned by some string looking like this /dataset/city=London/data.parquet /dataset/city=NewYork/data.parquet /dataset/city=Paris/data.paruqet …. I am trying to get to load it using sqlContext using sqlcontext.parquetFile( hdfs://some ip:8029/dataset/ what do i put here No leads so far. is there i can load the partitions ? I am running on cluster and not local.. -V -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org