Re: Idempotent count
Hi Binh, It stores the state as well the unprocessed data. It is a subset of the records that you aggregated so far. This provides a good reference for checkpointing. http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#checkpointing On Wed, Mar 18, 2015 at 12:52 PM, Binh Nguyen Van binhn...@gmail.com wrote: Hi Arush, Thank you for answering! When you say checkpoints hold metadata and Data, what is the Data? Is it the Data that is pulled from input source or is it the state? If it is state then is it the same number of records that I aggregated since beginning or only a subset of it? How can I limit the size of state that is kept in checkpoint? Thank you -Binh On Tue, Mar 17, 2015 at 11:47 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Yes spark streaming is capable of stateful stream processing. With or without state is a way of classifying state. Checkpoints hold metadata and Data. Thanks On Wed, Mar 18, 2015 at 4:00 AM, Binh Nguyen Van binhn...@gmail.com wrote: Hi all, I am new to Spark so please forgive me if my questions is stupid. I am trying to use Spark-Streaming in an application that read data from a queue (Kafka) and do some aggregation (sum, count..) and then persist result to an external storage system (MySQL, VoltDB...) From my understanding of Spark-Streaming, I can have two ways of doing aggregation: - Stateless: I don't have to keep state and just apply new delta values to the external system. From my understanding, doing in this way I may end up with over counting when there is failure and replay. - Statefull: Use checkpoint to keep state and blindly save new state to external system. Doing in this way I have correct aggregation result but I have to keep data in two places (state and external system) My questions are: - Is my understanding of Stateless and Statefull aggregation correct? If not please correct me! - For the Statefull aggregation, What does Spark-Streaming keep when it saves checkpoint? Please kindly help! Thanks -Binh -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: updateStateByKey performance API
You can always throw more machines at this and see if the performance is increasing. Since you haven't mentioned anything regarding your # cores etc. Thanks Best Regards On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote: Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: updateStateByKey performance API
Hi Akhil, Yes, that's what we are planning on doing at the end of the data. At the moment I am doing performance testing before the job hits production and testing on 4 cores to get baseline figures and deduced that in order to grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if we don't want to allocate more than 8 cores on this job. The thing is that since we have a big silent window on the user interactions where the stream will have very few data we would like to be able to use these cores for batch processing during that window but we can't the way it currently works. best regards n On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can always throw more machines at this and see if the performance is increasing. Since you haven't mentioned anything regarding your # cores etc. Thanks Best Regards On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote: Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Apache Spark ALS recommendations approach
Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts)
Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at mycompany.mypackage.MyClassFunction.apply(MyClassFunction.scala:34) at mycompany.mypackage.MyClass$.main(MyClass.scala:254) at mycompany.mypackage.MyClass.main(MyClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:441) 15/03/17 11:32:14 INFO
Re: Spark Job History Server
You can simply turn it on using: ./sbin/start-history-server.sh Read more here http://spark.apache.org/docs/1.3.0/monitoring.html. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn. history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn. history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.3 release
I don't think this is the problem, but I think you'd also want to set -Dhadoop.version= to match your deployment version, if you're building for a particular version, just to be safe-est. I don't recall seeing that particular error before. It indicates to me that the SparkContext is null. Is this maybe a knock-on error from the SparkContext not initializing? I can see it would then cause this to fail to init. On Tue, Mar 17, 2015 at 7:16 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Yes, I did, with these arguments: --tgz -Pyarn -Phadoop-2.4 -Phive -Phive-thriftserver To be more specific about what is not working, when I launch spark-shell --master yarn, I get this error immediately after launch. I have no idea from looking at the source. java.lang.NullPointerException at org.apache.spark.sql.SQLContext.init(SQLContext.scala:141) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:49) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1027) at $iwC$$iwC.init(console:9) On Tue, Mar 17, 2015 at 7:43 AM, Sean Owen so...@cloudera.com wrote: OK, did you build with YARN support (-Pyarn)? and the right incantation of flags like -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.2 or similar? On Tue, Mar 17, 2015 at 2:39 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I did not find that the generic build worked. In fact I also haven't gotten a build from source to work either, though that one might be a case of PEBCAK. In the former case I got errors about the build not having YARN support. On Sun, Mar 15, 2015 at 3:03 AM, Sean Owen so...@cloudera.com wrote: I think (I hope) it's because the generic builds just work. Even though these are of course distributed mostly verbatim in CDH5, with tweaks to be compatible with other stuff at the edges, the stock builds should be fine too. Same for HDP as I understand. The CDH4 build may work on some builds of CDH4, but I think is lurking there as a Hadoop 2.0.x plus a certain YARN beta build. I'd prefer to rename it that way, myself, since it doesn't actually work with all of CDH4 anyway. Are the MapR builds there because the stock Hadoop build doesn't work on MapR? that would actually surprise me, but then, why are these two builds distributed? On Sun, Mar 15, 2015 at 6:22 AM, Eric Friedman eric.d.fried...@gmail.com wrote: Is there a reason why the prebuilt releases don't include current CDH distros and YARN support? Eric Friedman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
Probably 1.3.0 - it has some improvements in the included Kafka receiver for streaming. https://spark.apache.org/releases/spark-release-1-3-0.html Regards, Jeff 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com: Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: [spark-streaming] can shuffle write to disk be disabled?
Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: Spark Job History Server
I turned it on. But it failed to start. In the log, Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -cp :/root/spark-1.3.0-bin-hadoop2.4/sbin/../conf:/root/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer 15/03/18 10:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:183) at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala) Patcharee On 18. mars 2015 11:35, Akhil Das wrote: You can simply turn it on using: |./sbin/start-history-server.sh| Read more here http://spark.apache.org/docs/1.3.0/monitoring.html. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee patcharee.thong...@uni.no mailto:patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 http://sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Using Spark with a SOCKS proxy
Did you try ssh tunneling instead of SOCKS? Thanks Best Regards On Wed, Mar 18, 2015 at 5:45 AM, Kelly, Jonathan jonat...@amazon.com wrote: I'm trying to figure out how I might be able to use Spark with a SOCKS proxy. That is, my dream is to be able to write code in my IDE then run it without much trouble on a remote cluster, accessible only via a SOCKS proxy between the local development machine and the master node of the cluster (ignoring, for now, any dependencies that would need to be transferred--assume it's a very simple app with no dependencies that aren't part of the Spark classpath on the cluster). This is possible with Hadoop by setting hadoop.rpc.socket.factory.class.default to org.apache.hadoop.net.SocksSocketFactory and hadoop.socks.server to localhost:port on which a SOCKS proxy has been opened via ssh -D to the master node. However, I can't seem to find anything like this for Spark, and I only see very few mentions of it on the user list and on stackoverflow, with no real answers. (See links below.) I thought I might be able to use the JVM's -DsocksProxyHost and -DsocksProxyPort system properties, but it still does not seem to work. That is, if I start a SOCKS proxy to my master node using something like ssh -D 2600 master node public name then run a simple Spark app that calls SparkConf.setMaster(spark://master node private IP:7077), passing in JVM args of -DsocksProxyHost=locahost -DsocksProxyPort=2600, the driver hangs for a while before finally giving up (Application has been killed. Reason: All masters are unresponsive! Giving up.). It seems like it is not even attempting to use the SOCKS proxy. Do -DsocksProxyHost/-DsocksProxyPort not even work for Spark? http://stackoverflow.com/questions/28047000/connect-to-spark-through-a-socks-proxy (unanswered similar question from somebody else about a month ago) https://issues.apache.org/jira/browse/SPARK-5004 (unresolved, somewhat related JIRA from a few months ago) Thanks, Jonathan
Re: Spark + Kafka
Thanks Jeff, I'm planning to use it in standalone mode, OK will use hadoop 2.4 package. Chao! On Wed, Mar 18, 2015 at 10:56 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: What you call sub-category are packages pre-built to run on certain Hadoop environments. It really depends on where you want to run Spark. As far as I know, this is mainly about the included HDFS binding - so if you just want to play around with Spark, any of the packages should be fine. I wouldn't use source though, because you'd have to compile it yourself. PS: Make sure to use Reply to all. If you're not including the mailing list in the response, I'm the only one who will get your message. Regards, Jeff 2015-03-18 10:49 GMT+01:00 James King jakwebin...@gmail.com: Any sub-category recommendations hadoop, MapR, CDH? On Wed, Mar 18, 2015 at 10:48 AM, James King jakwebin...@gmail.com wrote: Many thanks Jeff will give it a go. On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Probably 1.3.0 - it has some improvements in the included Kafka receiver for streaming. https://spark.apache.org/releases/spark-release-1-3-0.html Regards, Jeff 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com: Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: HIVE SparkSQL
Hi: I need to count some Game Player Events in the game. Such as : How Many Players stay in the game scene 1--Save the Princess from a Dragon Moneys they have paid in the last 5 min How many players pay money for go through this scene much more esily age distribution of themgender distribution of them How many players have not login the game for 5 days after they go through this game scene T The log file have been pre-format, can be load into the mysql directly: RoleLevelUp|1426251269733|5503232ae4b00f39751f1012|2015-03-14 02:22:46|192.168.1.16|1048630|220|0|2|57|1993| RoleLevelUp|1426251269734|5503232ae4b00f39751f1012|2015-03-14 02:22:52|192.168.1.16|1048630|水奈坤|0|0|3|67|1999| RoleLevelUp|1426251269735|550329f9e4b00f39751f101d|2015-03-14 02:24:57|192.168.1.137|1048631|z12|0|0|41|0|380| RoleLevelUp|1426251269736|5503232ae4b00f39751f1012|2015-03-14 02:39:01|192.168.1.16|1048630|水奈坤|0|0|15|0|2968 Now mysql can't satisfy the analysis needs, we want to use other technical to rebuild all static Systems Thanks Best Regards Yours Meng
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: |15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at
Re: GraphX: Get edges for a vertex
Hi Mas, I never actually worked with GraphX, but one idea: As far as I know, you can directly access the vertex and edge RDDs of your Graph object. Why not simply run a .filter() on the edge RDD to get all edges that originate from or end at your vertex? Regards, Jeff 2015-03-18 10:52 GMT+01:00 mas mas.ha...@gmail.com: Hi, Just to continue with the question. I need to find the edges of one particular vertex. However, (collectNeighbors/collectNeighborIds) provides the neighbors/neighborids for all the vertices of the graph. Any help in this regard will be highly appreciated. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Get-edges-for-a-vertex-tp18880p22115.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [spark-streaming] can shuffle write to disk be disabled?
I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: Spark + Kafka
What you call sub-category are packages pre-built to run on certain Hadoop environments. It really depends on where you want to run Spark. As far as I know, this is mainly about the included HDFS binding - so if you just want to play around with Spark, any of the packages should be fine. I wouldn't use source though, because you'd have to compile it yourself. PS: Make sure to use Reply to all. If you're not including the mailing list in the response, I'm the only one who will get your message. Regards, Jeff 2015-03-18 10:49 GMT+01:00 James King jakwebin...@gmail.com: Any sub-category recommendations hadoop, MapR, CDH? On Wed, Mar 18, 2015 at 10:48 AM, James King jakwebin...@gmail.com wrote: Many thanks Jeff will give it a go. On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Probably 1.3.0 - it has some improvements in the included Kafka receiver for streaming. https://spark.apache.org/releases/spark-release-1-3-0.html Regards, Jeff 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com: Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: [spark-streaming] can shuffle write to disk be disabled?
I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spillfalse On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: Spark Job History Server
You are not having yarn package in the classpath. You need to build your spark it with yarn. You can read these docs. http://spark.apache.org/docs/1.3.0/running-on-yarn.html Thanks Best Regards On Wed, Mar 18, 2015 at 4:07 PM, patcharee patcharee.thong...@uni.no wrote: I turned it on. But it failed to start. In the log, Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -cp :/root/spark-1.3.0-bin-hadoop2.4/sbin/../conf:/root/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer 15/03/18 10:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:183) at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala) Patcharee On 18. mars 2015 11:35, Akhil Das wrote: You can simply turn it on using: ./sbin/start-history-server.sh Read more here http://spark.apache.org/docs/1.3.0/monitoring.html. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Job History Server
Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark ALS recommendations approach
Hi, If you do cartesian join to predict users' preference over all the products, I think that 8 nodes with 64GB ram would not be enough for the data. Recently, I used als for a similar situation, but just 10M users and 0.1M products, the minimum requirement is 9 nodes with 10GB RAM. Moreover, even the program pass, the time of treatment will be very long. Maybe you should try to reduce the set to predict for each client, as in practice, you never need predict the preference of all products to make a recommendation. Hope this will be helpful. Cheers Gen On Wed, Mar 18, 2015 at 12:13 PM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts)
Re: Apache Spark ALS recommendations approach
Thanks much for your reply. By saying on the fly, you mean caching the trained model, and querying it for each user joined with 30M products when needed? Our question is more about the general approach, what if we have 7M DAU? How the companies deal with that using Spark? On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen so...@cloudera.com wrote: Not just the join, but this means you're trying to compute 600 trillion dot products. It will never finish fast. Basically: don't do this :) You don't in general compute all recommendations for all users, but recompute for a small subset of users that were or are likely to be active soon. (Or compute on the fly.) Is anything like that an option? On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts)
Integration of Spark1.2.0 cdh4 with Jetty 9.2.10
Hi all, We are using spark-assembly-1.2.0-hadoop 2.0.0-mr1-cdh4.2.0.jar in our application. When we try to deploy the application on Jetty (jetty-distribution-9.2.10.v20150310) we get the below exception at the server startup. Initially we were getting the below exception, Caused by: java.lang.IllegalArgumentException: The servletContext ServletContext@o.e.j.s.ServletContextHandler{/static,null} org.eclipse.jetty.servlet.ServletContextHandler$Context is not org.eclipse.jetty.server.handler.ContextHandler$Context at org.eclipse.jetty.servlet.DefaultServlet.initContextHandler(DefaultServlet.java:310) at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:175) at javax.servlet.GenericServlet.init(GenericServlet.java:242) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532) Then we tweaked jars jetty server and jetty-util and now we are getting the below exception Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.server.bio.SocketConnector at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at org.eclipse.jetty.webapp.WebAppClassLoader.findClass(WebAppClassLoader.java:510) at org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:441) at org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:403) Request you to please suggest some solution to this. Regards, Sayantini
Re: Spark Job History Server
Hi, My spark was compiled with yarn profile, I can run spark on yarn without problem. For the spark job history server problem, I checked spark-assembly-1.3.0-hadoop2.4.0.jar and found that the package org.apache.spark.deploy.yarn.history is missing. I don't know why BR, Patcharee On 18. mars 2015 11:43, Akhil Das wrote: You are not having yarn package in the classpath. You need to build your spark it with yarn. You can read these docs. http://spark.apache.org/docs/1.3.0/running-on-yarn.html Thanks Best Regards On Wed, Mar 18, 2015 at 4:07 PM, patcharee patcharee.thong...@uni.no mailto:patcharee.thong...@uni.no wrote: I turned it on. But it failed to start. In the log, Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -cp :/root/spark-1.3.0-bin-hadoop2.4/sbin/../conf:/root/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer 15/03/18 10:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:183) at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala) Patcharee On 18. mars 2015 11:35, Akhil Das wrote: You can simply turn it on using: |./sbin/start-history-server.sh| Read more here http://spark.apache.org/docs/1.3.0/monitoring.html. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee patcharee.thong...@uni.no mailto:patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 http://sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
sparksql native jdbc driver
hey guys, In my understanding SparkSQL only supports JDBC connection through hive thrift server, is this correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [spark-streaming] can shuffle write to disk be disabled?
From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. As I said, did you think shuffle is the bottleneck which makes your job running slowly? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 6:41 PM To: Akhil Das Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spill false On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.commailto:darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0 slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325 116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
DataFrame operation on parquet: GC overhead limit exceeded
Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date).agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializerorg.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: sparksql native jdbc driver
Yes, I have been using Spark SQL from the onset. Haven't found any other Server for Spark SQL for JDBC connectivity. On Wed, Mar 18, 2015 at 5:50 PM, sequoiadb mailing-list-r...@sequoiadb.com wrote: hey guys, In my understanding SparkSQL only supports JDBC connection through hive thrift server, is this correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Apache Spark ALS recommendations approach
Not just the join, but this means you're trying to compute 600 trillion dot products. It will never finish fast. Basically: don't do this :) You don't in general compute all recommendations for all users, but recompute for a small subset of users that were or are likely to be active soon. (Or compute on the fly.) Is anything like that an option? On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Apache Spark ALS recommendations approach
Hi all, Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-ALS-recommendations-approach-tp22116.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [spark-streaming] can shuffle write to disk be disabled?
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
srcAttr in graph.triplets don't update when the size of graph is huge
when the size of the graph is huge(0.2 billion vertex, 6 billion edges), the srcAttr and dstAttr in graph.triplets don't update when using the Graph.outerJoinVertices(when the data in vertex is changed). the code and the log is as follows: g = graph.outerJoinVertices()... g,vertices,count() g.edges.count() println(example edge + g.triplets.filter(e = e.srcId == 51L).collect() .map(e =(e.srcId + : + e.srcAttr + , + e.dstId + : + e.dstAttr)).mkString(\n)) println(example vertex + g.vertices.filter(e = e._1 == 51L).collect() .map(e = (e._1 + , + e._2)).mkString(\n)) the result: example edge 51:0, 2467451620:61 51:0, 1962741310:83 // attr of vertex 51 is 0 in Graph.triplets example vertex 51,2 // attr of vertex 51 is 2 in Graph.vertices when the graph is smaller(10 million vertex), the code is OK, the triplets will update when the vertex is changed
Re: Spark Job History Server
Those classes are not part of standard Spark. You may want to contact Hortonworks directly if they're suggesting you use those. On Wed, Mar 18, 2015 at 3:30 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark ALS recommendations approach
I don't think that you need memory to put the whole joined data set in memory. However memory is unlikely to be the limiting factor, it's the massive shuffle. OK, you really do have a large recommendation problem if you're recommending for at least 7M users per day! My hunch is that it won't be fast enough to use the simple predict() or recommendProducts() method repeatedly. There was a proposal to make a recommendAll() method which you could crib (https://issues.apache.org/jira/browse/SPARK-3066) but that looks like still a work in progress since the point there was to do more work to make it possibly scale. You may consider writing a bit of custom code to do the scoring. For example cache parts of the item-factor matrix in memory on the workers and score user feature vectors in bulk against them. There's a different school of though which is to try to compute only what you need, on the fly, and cache it if you like. That is good in that it doesn't waste effort and makes the result fresh, but, of course, means creating or consuming some other system to do the scoring and getting *that* to run fast. You can also look into techniques like LSH for probabilistically guessing which tiny subset of all items are worth considering, but that's also something that needs building more code. I'm sure a couple people could chime in on that here but it's kind of a separate topic. On Wed, Mar 18, 2015 at 8:04 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Thanks much for your reply. By saying on the fly, you mean caching the trained model, and querying it for each user joined with 30M products when needed? Our question is more about the general approach, what if we have 7M DAU? How the companies deal with that using Spark? On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen so...@cloudera.com wrote: Not just the join, but this means you're trying to compute 600 trillion dot products. It will never finish fast. Basically: don't do this :) You don't in general compute all recommendations for all users, but recompute for a small subset of users that were or are likely to be active soon. (Or compute on the fly.) Is anything like that an option? On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Difference among batchDuration, windowDuration, slideDuration
I think hsy541 is still confused by what is still confusing to me. Namely, what is the value that sentence Each RDD in a DStream contains data from a certain interval is speaking of? This is from the Discretized Streams http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams section. The example makes it seem like the batchDuration is 4 seconds and then this mystery interval is 1 second? Where is this mystery interval defined? Or am i missing something altogether? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Difference-among-batchDuration-windowDuration-slideDuration-tp9966p22119.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date).agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: |15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
Re: Apache Spark ALS recommendations approach
There is also a batch prediction API in PR https://github.com/apache/spark/pull/3098 Idea here is what Sean said...don't try to reconstruct the whole matrix which will be dense but pick a set of users and calculate topk recommendations for them using dense level 3 blas.we are going to merge this for 1.4...this is useful in general for cross validating on prec@k measure to tune the model... Right now it uses level 1 blas but the next extension is to use level 3 blas to further make the compute faster... On Mar 18, 2015 6:48 AM, Sean Owen so...@cloudera.com wrote: I don't think that you need memory to put the whole joined data set in memory. However memory is unlikely to be the limiting factor, it's the massive shuffle. OK, you really do have a large recommendation problem if you're recommending for at least 7M users per day! My hunch is that it won't be fast enough to use the simple predict() or recommendProducts() method repeatedly. There was a proposal to make a recommendAll() method which you could crib (https://issues.apache.org/jira/browse/SPARK-3066) but that looks like still a work in progress since the point there was to do more work to make it possibly scale. You may consider writing a bit of custom code to do the scoring. For example cache parts of the item-factor matrix in memory on the workers and score user feature vectors in bulk against them. There's a different school of though which is to try to compute only what you need, on the fly, and cache it if you like. That is good in that it doesn't waste effort and makes the result fresh, but, of course, means creating or consuming some other system to do the scoring and getting *that* to run fast. You can also look into techniques like LSH for probabilistically guessing which tiny subset of all items are worth considering, but that's also something that needs building more code. I'm sure a couple people could chime in on that here but it's kind of a separate topic. On Wed, Mar 18, 2015 at 8:04 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Thanks much for your reply. By saying on the fly, you mean caching the trained model, and querying it for each user joined with 30M products when needed? Our question is more about the general approach, what if we have 7M DAU? How the companies deal with that using Spark? On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen so...@cloudera.com wrote: Not just the join, but this means you're trying to compute 600 trillion dot products. It will never finish fast. Basically: don't do this :) You don't in general compute all recommendations for all users, but recompute for a small subset of users that were or are likely to be active soon. (Or compute on the fly.) Is anything like that an option? On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparksql native jdbc driver
Yes On 3/18/15 8:20 PM, sequoiadb wrote: hey guys, In my understanding SparkSQL only supports JDBC connection through hive thrift server, is this correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark ALS recommendations approach
Thanks gen for helpful post. Thank you Sean, we're currently exploring this world of recommendations with Spark, and your posts are very helpful to us. We've noticed that you're a co-author of Advanced Analytics with Spark, just not to get to deep into offtopic, will it be finished soon? On Wed, Mar 18, 2015 at 5:47 PM, Sean Owen so...@cloudera.com wrote: I don't think that you need memory to put the whole joined data set in memory. However memory is unlikely to be the limiting factor, it's the massive shuffle. OK, you really do have a large recommendation problem if you're recommending for at least 7M users per day! My hunch is that it won't be fast enough to use the simple predict() or recommendProducts() method repeatedly. There was a proposal to make a recommendAll() method which you could crib (https://issues.apache.org/jira/browse/SPARK-3066) but that looks like still a work in progress since the point there was to do more work to make it possibly scale. You may consider writing a bit of custom code to do the scoring. For example cache parts of the item-factor matrix in memory on the workers and score user feature vectors in bulk against them. There's a different school of though which is to try to compute only what you need, on the fly, and cache it if you like. That is good in that it doesn't waste effort and makes the result fresh, but, of course, means creating or consuming some other system to do the scoring and getting *that* to run fast. You can also look into techniques like LSH for probabilistically guessing which tiny subset of all items are worth considering, but that's also something that needs building more code. I'm sure a couple people could chime in on that here but it's kind of a separate topic. On Wed, Mar 18, 2015 at 8:04 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Thanks much for your reply. By saying on the fly, you mean caching the trained model, and querying it for each user joined with 30M products when needed? Our question is more about the general approach, what if we have 7M DAU? How the companies deal with that using Spark? On Wed, Mar 18, 2015 at 3:39 PM, Sean Owen so...@cloudera.com wrote: Not just the join, but this means you're trying to compute 600 trillion dot products. It will never finish fast. Basically: don't do this :) You don't in general compute all recommendations for all users, but recompute for a small subset of users that were or are likely to be active soon. (Or compute on the fly.) Is anything like that an option? On Wed, Mar 18, 2015 at 7:13 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Trying to build recommendation system using Spark MLLib's ALS. Currently, we're trying to pre-build recommendations for all users on daily basis. We're using simple implicit feedbacks and ALS. The problem is, we have 20M users and 30M products, and to call the main predict() method, we need to have the cartesian join for users and products, which is too huge, and it may take days to generate only the join. Is there a way to avoid cartesian join to make the process faster? Currently we have 8 nodes with 64Gb of RAM, I think it should be enough for the data. val users: RDD[Int] = ??? // RDD with 20M userIds val products: RDD[Int] = ???// RDD with 30M productIds val ratings : RDD[Rating] = ??? // RDD with all user-product feedbacks val model = new ALS().setRank(10).setIterations(10) .setLambda(0.0001).setImplicitPrefs(true) .setAlpha(40).run(ratings) val usersProducts = users.cartesian(products) val recommendations = model.predict(usersProducts)
Column Similarity using DIMSUM
Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring (all 2%), then job runs in a lightning fast manner (even for 100 Entities X 1 attributes) and results are very accurate. I am running Spark 1.2.0-cdh5.3.0 on 11 node cluster each having 4 cores and 16GB of RAM. My question is - Is this behavior expected for datasets where some Attributes frequently occur? Thanks, Manish Gupta
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
You know, I actually have one of the columns called timestamp ! This may really cause the problem reported in the bug you linked, I guess. On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote: I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
Did DataFrames break basic SQLContext?
I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date).agg(sum(power),sum(supply) ).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hey Cheng, thank you so much for your suggestion, the problem was actually a column/field called timestamp in one of the case classes!! Once I changed its name everything worked out fine again. Let me say it was kinda frustrating ... Roberto On Wed, Mar 18, 2015 at 4:07 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: You know, I actually have one of the columns called timestamp ! This may really cause the problem reported in the bug you linked, I guess. On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote: I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at
Re: RDD to InputStream
In case it would interest other peoples, here is what I come up with and it seems to work fine: case class RDDAsInputStream(private val rdd: RDD[String]) extends java.io.InputStream { var bytes = rdd.flatMap(_.getBytes(UTF-8)).toLocalIterator def read(): Int = { if(bytes.hasNext) bytes.next.toInt else -1 } override def markSupported(): Boolean = false } 2015-03-13 13:56 GMT+01:00 Sean Owen so...@cloudera.com: OK, then you do not want to collect() the RDD. You can get an iterator, yes. There is no such thing as making an Iterator into an InputStream. An Iterator is a sequence of arbitrary objects; an InputStream is a channel to a stream of bytes. I think you can employ similar Guava / Commons utilities to make an Iterator of Streams in a stream of Readers, join the Readers, and encode the result as bytes in an InputStream. On Fri, Mar 13, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Thanks Sean, I forgot to mention that the data is too big to be collected on the driver. So yes your proposition would work in theory but in my case I cannot hold all the data in the driver memory, therefore it wouldn't work. I guess the crucial point to to do the collect in a lazy way and in that subject I noticed that we can get a local iterator from an RDD but that rises two questions: - does that involves an mediate collect just like with collect() or is it lazy process ? - how to go from an iterator to an InputStream ? 2015-03-13 11:17 GMT+01:00 Sean Owen [hidden email]: These are quite different creatures. You have a distributed set of Strings, but want a local stream of bytes, which involves three conversions: - collect data to driver - concatenate strings in some way - encode strings as bytes according to an encoding Your approach is OK but might be faster to avoid disk, if you have enough memory: - collect() to a Array[String] locally - use Guava utilities to turn a bunch of Strings into a Reader - Use the Apache Commons ReaderInputStream to read it as encoded bytes I might wonder if that's all really what you want to do though. On Fri, Mar 13, 2015 at 9:54 AM, Ayoub [hidden email] wrote: Hello, I need to convert an RDD[String] to a java.io.InputStream but I didn't find an east way to do it. Currently I am saving the RDD as temporary file and then opening an inputstream on the file but that is not really optimal. Does anybody know a better way to do that ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-InputStream-tp22031.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] View this message in context: Re: RDD to InputStream Sent from the Apache Spark User List mailing list archive at Nabble.com. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-to-InputStream-tp22121.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: StorageLevel: OFF_HEAP
Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga
Re: Did DataFrames break basic SQLContext?
It appears that the metastore_db problem is related to https://issues.apache.org/jira/browse/SPARK-4758. I had another shell open that was stuck. This is probably a bug, though? import sqlContext.implicits case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) rdd.toDF results in a frozen shell after this line: INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . which, locks the internally created metastore_db On Wed, Mar 18, 2015 at 11:20 AM, Justin Pihony justin.pih...@gmail.com wrote: I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to get the cached RDD
sc.getPersistentRDDs(0).asInstanceOf[RDD[Array[Double]]] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-cached-RDD-tp22114p22122.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTable fails to save RDD in Spark SQL 1.3.0
/user/hive/warehouse is a hdfs location. I’ve changed the mod for this location but I’m still having the same issue. hduser@hadoop01-VirtualBox:/opt/spark/bin$ hdfs dfs -chmod -R 777 /user/hive hduser@hadoop01-VirtualBox:/opt/spark/bin$ hdfs dfs -ls /user/hive/warehouse Found 1 items 15/03/18 09:31:47 INFO DAGScheduler: Stage 3 (runJob at newParquet.scala:648) finished in 0.347 s 15/03/18 09:31:47 INFO DAGScheduler: Job 3 finished: runJob at newParquet.scala:648, took 0.549170 s Traceback (most recent call last): File stdin, line 1, in module File /opt/spark/python/pyspark/sql/dataframe.py, line 191, in saveAsTable self._jdf.saveAsTable(tableName, source, jmode, joptions) File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o49.saveAsTable. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/user/hive/warehouse/order04/_temporary/0/task_201503180931_0017_r_01/part-r-2.parquet; isDirectory=false; length=5591; replication=1; blocksize=33554432; modification_time=1426696307000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/user/hive/warehouse/order04/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:649) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:126) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:217) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1018) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) any help is appreciated. Thanks From: fightf...@163.com [mailto:fightf...@163.com] Sent: March-17-15 6:33 PM To: Shahdad Moradi; user Subject: Re: saveAsTable fails to save RDD in Spark SQL 1.3.0 Looks like some authentification issues. Can you check that your current user had authority to operate (maybe r/w/x) on /user/hive/warehouse? Thanks, Sun. fightf...@163.commailto:fightf...@163.com From: smoradimailto:smor...@currenex.com Date: 2015-03-18 09:24 To: usermailto:user@spark.apache.org Subject: saveAsTable fails to save RDD in Spark SQL 1.3.0 Hi, Basically my goal is to make the Spark SQL RDDs available to Tableau software through Simba ODBC driver. I’m running standalone Spark 1.3.0 on Ubuntu 14.04. Got the source code and complied it with maven. Hive is also setup and connected to mysql all on a the same machine. The hive-site.xml file has been copied to spark/conf. Here is the content of the hive-site.xml: configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:MySql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description
mapPartitions - How Does it Works
I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does-it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: mapPartitions - How Does it Works
Here is what I think: mapPartitions is for a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The combined result iterators are automatically converted into a new RDD. So in this case, the RDD (1,2,, 10) is split as 3 partitions, (1,2,3), (4,5,6), (7,8,9,10). For every partition, your function is the get the first element as x.next, using it to build a list, return the iterator from the List. So each partition will return (1), (4) and (7) as 3 iterator, then combine to one final RDD (1, 4, 7). Yong Date: Wed, 18 Mar 2015 10:19:34 -0700 From: ashish.us...@gmail.com To: user@spark.apache.org Subject: mapPartitions - How Does it Works I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does-it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mapPartitions - How Does it Works
Map partitions works as follows : 1) For each partition of your RDD, it provides an iterator over the values within that partition 2) You then define a function that operates on that iterator Thus if you do the following: val parallel = sc.parallelize(1 to 10, 3) parallel.mapPartitions( x = x.map(s = s + 1)).collect You would get: res3: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11) In your example, x is not a pointer that traverses the iterator (e.g. With .next()) , it¹s literally the Iterable object itself. On 3/18/15, 10:19 AM, ashish.usoni ashish.us...@gmail.com wrote: I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does -it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Column Similarity using DIMSUM
Hi Reza, I have tried threshold to be only in the range of 0 to 1. I was not aware that threshold can be set to above 1. Will try and update. Thank You - Manish From: Reza Zadeh [mailto:r...@databricks.com] Sent: Wednesday, March 18, 2015 10:55 PM To: Manish Gupta 8 Cc: user@spark.apache.org Subject: Re: Column Similarity using DIMSUM Hi Manish, Did you try calling columnSimilarities(threshold) with different threshold values? You try threshold values of 0.1, 0.5, 1, and 20, and higher. Best, Reza On Wed, Mar 18, 2015 at 10:40 AM, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com wrote: Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring (all 2%), then job runs in a lightning fast manner (even for 100 Entities X 1 attributes) and results are very accurate. I am running Spark 1.2.0-cdh5.3.0 on 11 node cluster each having 4 cores and 16GB of RAM. My question is - Is this behavior expected for datasets where some Attributes frequently occur? Thanks, Manish Gupta
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hi Roberto, For now, if the timestamp is a top level column (not a field in a struct), you can use use backticks to quote the column name like `timestamp `. Thanks, Yin On Wed, Mar 18, 2015 at 12:10 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hey Cheng, thank you so much for your suggestion, the problem was actually a column/field called timestamp in one of the case classes!! Once I changed its name everything worked out fine again. Let me say it was kinda frustrating ... Roberto On Wed, Mar 18, 2015 at 4:07 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: You know, I actually have one of the columns called timestamp ! This may really cause the problem reported in the bug you linked, I guess. On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote: I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at
Database operations on executor nodes
I was wondering what people generally do about doing database operations from executor nodes. I’m (at least for now) avoiding doing database updates from executor nodes to avoid proliferation of database connections on the cluster. The general pattern I adopt is to collect queries (or tuples) on the executors and write to the database on the driver. // Executes on the executor rdd.foreach(s = { val query = sinsert into ${s}; accumulator += query; }); // Executes on the driver acclumulator.value.foreach(query = { // get connection // update database }); I’m obviously trading database connections for driver heap. How do other spark users do it? Cheers Praveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Null pointer exception reading Parquet
Hi All, I am using Saprk version 1.2 running locally. When I try to read a paquet file I get below exception, what might be the issue? Any help will be appreciated. This is the simplest operation/action on a parquet file. //code snippet// val sparkConf = new SparkConf().setAppName( Testing).setMaster(local[10]) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.parquet.binaryAsString,true) import sqlContext._ val temp = local path to file val temp2 = sqlContext.parquetFile(temp) temp2.printSchema //end code snippet //Exception trace Exception in thread main java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:389) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:457) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:477) at org.apache.spark.sql.parquet.ParquetRelation.init(ParquetRelation.scala:65) at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:165) //End Exception trace -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Null-pointer-exception-reading-Parquet-tp22124.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Did DataFrames break basic SQLContext?
To answer your first question - yes 1.3.0 did break backward compatibility for the change from SchemaRDD - DataFrame. SparkSQL was an alpha component so api breaking changes could happen. It is no longer an alpha component as of 1.3.0 so this will not be the case in future. Adding toDF should hopefully not be too much of an effort. For the second point - I also have seen these exceptions when upgrading jobs to 1.3.0 - but they don't fail my jobs. Not sure what the cause is would be good to understand this. — Sent from Mailbox On Wed, Mar 18, 2015 at 5:22 PM, Justin Pihony justin.pih...@gmail.com wrote: I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
I have used various version of spark (1.0, 1.2.1) without any issues . Though I have not significantly used kafka with 1.3.0 , a preliminary testing revealed no issues . - khanderao On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote: Hi All, Which build of Spark is best when using Kafka? Regards jk - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.3 release
Sean, you are exactly right, as I learned from parsing your earlier reply more carefully -- sorry I didn't do this the first time. Setting hadoop.version was indeed the solution ./make-distribution.sh --tgz -Pyarn -Phadoop-2.4 -Phive -Phive-thriftserver -Dhadoop.version=2.5.0-cdh5.3.2 Thanks for your help! Eric On Wed, Mar 18, 2015 at 4:19 AM, Sean Owen so...@cloudera.com wrote: I don't think this is the problem, but I think you'd also want to set -Dhadoop.version= to match your deployment version, if you're building for a particular version, just to be safe-est. I don't recall seeing that particular error before. It indicates to me that the SparkContext is null. Is this maybe a knock-on error from the SparkContext not initializing? I can see it would then cause this to fail to init. On Tue, Mar 17, 2015 at 7:16 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Yes, I did, with these arguments: --tgz -Pyarn -Phadoop-2.4 -Phive -Phive-thriftserver To be more specific about what is not working, when I launch spark-shell --master yarn, I get this error immediately after launch. I have no idea from looking at the source. java.lang.NullPointerException at org.apache.spark.sql.SQLContext.init(SQLContext.scala:141) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:49) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1027) at $iwC$$iwC.init(console:9) On Tue, Mar 17, 2015 at 7:43 AM, Sean Owen so...@cloudera.com wrote: OK, did you build with YARN support (-Pyarn)? and the right incantation of flags like -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.2 or similar? On Tue, Mar 17, 2015 at 2:39 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I did not find that the generic build worked. In fact I also haven't gotten a build from source to work either, though that one might be a case of PEBCAK. In the former case I got errors about the build not having YARN support. On Sun, Mar 15, 2015 at 3:03 AM, Sean Owen so...@cloudera.com wrote: I think (I hope) it's because the generic builds just work. Even though these are of course distributed mostly verbatim in CDH5, with tweaks to be compatible with other stuff at the edges, the stock builds should be fine too. Same for HDP as I understand. The CDH4 build may work on some builds of CDH4, but I think is lurking there as a Hadoop 2.0.x plus a certain YARN beta build. I'd prefer to rename it that way, myself, since it doesn't actually work with all of CDH4 anyway. Are the MapR builds there because the stock Hadoop build doesn't work on MapR? that would actually surprise me, but then, why are these two builds distributed? On Sun, Mar 15, 2015 at 6:22 AM, Eric Friedman eric.d.fried...@gmail.com wrote: Is there a reason why the prebuilt releases don't include current CDH distros and YARN support? Eric Friedman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
topic modeling using LDA in MLLib
I'm coming from a Hadoop background but I'm totally new to Apache Spark. I'd like to do topic modeling using LDA algorithm on some txt files. The example on the Spark website assumes that the input to the LDA is a file containing the words counts. I wonder if someone could help me figuring out the steps to start from actual txt documents (actual content) and come up with the actual topics. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/topic-modeling-using-LDA-in-MLLib-tp22128.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date).agg(sum(power),sum(supply) ).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: Spark + HBase + Kerberos
Hi Ted, The spark executors and hbase regions/masters are all collocated. This is a 2 node test environment. Best, Eric Eric Walk, Sr. Technical Consultant p: 617.855.9255 | NASDAQ: PRFT | Perficient.comhttp://www.perficient.com/ From: Ted Yu yuzhih...@gmail.com Sent: Mar 18, 2015 2:46 PM To: Eric Walk Cc: user@spark.apache.org;Bill Busch Subject: Re: Spark + HBase + Kerberos Are hbase config / keytab files deployed on executor machines ? Consider adding -Dsun.security.krb5.debug=true for debug purpose. Cheers On Wed, Mar 18, 2015 at 11:39 AM, Eric Walk eric.w...@perficient.commailto:eric.w...@perficient.com wrote: Having an issue connecting to HBase from a Spark container in a Secure Cluster. Haven’t been able to get past this issue, any thoughts would be appreciated. We’re able to perform some operations like “CreateTable” in the driver thread successfully. Read requests (always in the executor threads) are always failing with: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Logs and scala are attached, the names of the innocent have masked for their protection (in a consistent manner). Executing the following spark job (using HDP 2.2, Spark 1.2.0, HBase 0.98.4, Kerberos on AD): export SPARK_CLASSPATH=/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-server.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-protocol.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-hadoop2-compat.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-client.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-common.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/htrace-core-3.0.4.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/guava-12.0.1.jar:/usr/hdp/2.2.0.0-2041/hbase/conf /usr/hdp/2.2.0.0-2041/spark/bin/spark-submit --class HBaseTest --driver-memory 2g --executor-memory 1g --executor-cores 1 --num-executors 1 --master yarn-client ~/spark-test_2.10-1.0.jar We see this error in the executor processes (attached as yarn log.txt): 2015-03-18 17:34:15,121 DEBUG [Executor task launch worker-0] security.HBaseSaslRpcClient: Creating SASL GSSAPI client. Server's Kerberos principal name is hbase/ldevawshdp0002.dc1.pvc@dc1.PVC 2015-03-18 17:34:15,128 WARN [Executor task launch worker-0] ipc.RpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 2015-03-18 17:34:15,129 ERROR [Executor task launch worker-0] ipc.RpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] The HBase Master Logs show success: 2015-03-18 17:34:12,861 DEBUG [RpcServer.listener,port=6] ipc.RpcServer: RpcServer.listener,port=6: connection from 10.4.0.6:46636http://10.4.0.6:46636; # active connections: 3 2015-03-18 17:34:12,872 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Kerberos principal name is hbase/ldevawshdp0001.dc1.pvc@DC1.PVC 2015-03-18 17:34:12,875 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Created SASL server with mechanism = GSSAPI 2015-03-18 17:34:12,875 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 1501 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,876 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Will send token of size 108 from saslServer. 2015-03-18 17:34:12,877 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 0 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,878 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Will send token of size 32 from saslServer. 2015-03-18 17:34:12,878 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 32 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,879 DEBUG [RpcServer.reader=3,port=6] security.HBaseSaslRpcServer: SASL server GSSAPI callback: setting canonicalized client ID: user1@DC1.PVC 2015-03-18 17:34:12,895 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: SASL server context established. Authenticated client: user1@DC1.PVC (auth:SIMPLE). Negotiated QoP is auth 2015-03-18 17:34:29,313 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: RpcServer.listener,port=6: DISCONNECTING client 10.4.0.6:46636http://10.4.0.6:46636 because read count=-1. Number of active connections: 3 2015-03-18 17:34:37,102 DEBUG [RpcServer.listener,port=6] ipc.RpcServer: RpcServer.listener,port=6: connection from 10.4.0.6:46733http://10.4.0.6:46733; # active connections: 3 2015-03-18 17:34:37,102 DEBUG [RpcServer.reader=4,port=6] ipc.RpcServer: RpcServer.listener,port=6: DISCONNECTING client 10.4.0.6:46733http://10.4.0.6:46733 because read count=-1. Number of active connections: 3 The Spark Driver
Re: mapPartitions - How Does it Works
List(x.next).iterator is giving you the first element from each partition, which would be 1, 4 and 7 respectively. On 3/18/15, 10:19 AM, ashish.usoni ashish.us...@gmail.com wrote: I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does -it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [spark-streaming] can shuffle write to disk be disabled?
Hi, Saisai Here is the duration of one of the jobs, 22 seconds in total, it is longer than the sliding window. Stage Id Description Submitted Duration Tasks: Succeeded/Total Input Output Shuffle Read Shuffle Write 342foreach at SimpleApp.scala:58 2015/03/18 15:06:58 16 s 288/28810.6 MB 341 window at SimpleApp.scala:512015/03/18 15:06:52 6s 288/288 12.3 MB 10.6 MB And part of the driver log: 15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 1426662996000 ms 15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1426662932000 ms) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 100515, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 100509) in 370 ms on lvs02 (75/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 100516, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 100511) in 261 ms on lvs02 (76/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 100517, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 100512) in 274 ms on lvs02 (77/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 100518, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 100508) in 569 ms on lvs02 (78/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 100519, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 100510) in 539 ms on lvs02 (79/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 100520, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 100514) in 296 ms on lvs02 (80/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 100521, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 100515) in 292 ms on lvs02 (81/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 100522, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 100516) in 331 ms on lvs02 (82/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 100523, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 100517) in 271 ms on lvs02 (83/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 100524, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 100513) in 549 ms on lvs02 (84/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 100525, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 100518) in 327 ms on lvs02 (85/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 100526, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 100520) in 293 ms on lvs02 (86/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 100527, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 100521) in 257 ms on lvs02 (87/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 94.0 in stage 392.0 (TID 100528, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 90.0 in stage 392.0 (TID 100524) in 244 ms on lvs02 (88/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 95.0 in stage 392.0 (TID 100529, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 85.0 in stage 392.0 (TID 100519) in 455 ms on lvs02 (89/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 96.0 in stage 392.0 (TID 100530, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 89.0 in stage 392.0 (TID 100523) in 358 ms on lvs02 (90/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 97.0 in stage 392.0 (TID 100531, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 88.0 in stage 392.0 (TID 100522) in 548 ms on lvs02 (91/291) 15/03/18
Re: Idempotent count
Hi Arush, Thank you for answering! When you say checkpoints hold metadata and Data, what is the Data? Is it the Data that is pulled from input source or is it the state? If it is state then is it the same number of records that I aggregated since beginning or only a subset of it? How can I limit the size of state that is kept in checkpoint? Thank you -Binh On Tue, Mar 17, 2015 at 11:47 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Yes spark streaming is capable of stateful stream processing. With or without state is a way of classifying state. Checkpoints hold metadata and Data. Thanks On Wed, Mar 18, 2015 at 4:00 AM, Binh Nguyen Van binhn...@gmail.com wrote: Hi all, I am new to Spark so please forgive me if my questions is stupid. I am trying to use Spark-Streaming in an application that read data from a queue (Kafka) and do some aggregation (sum, count..) and then persist result to an external storage system (MySQL, VoltDB...) From my understanding of Spark-Streaming, I can have two ways of doing aggregation: - Stateless: I don't have to keep state and just apply new delta values to the external system. From my understanding, doing in this way I may end up with over counting when there is failure and replay. - Statefull: Use checkpoint to keep state and blindly save new state to external system. Doing in this way I have correct aggregation result but I have to keep data in two places (state and external system) My questions are: - Is my understanding of Stateless and Statefull aggregation correct? If not please correct me! - For the Statefull aggregation, What does Spark-Streaming keep when it saves checkpoint? Please kindly help! Thanks -Binh -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
RE: [spark-streaming] can shuffle write to disk be disabled?
Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 3:24 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? Hi, Saisai Here is the duration of one of the jobs, 22 seconds in total, it is longer than the sliding window. Stage Id Description Submitted Duration Tasks: Succeeded/Total Input Output Shuffle Read Shuffle Write 342 foreach at SimpleApp.scala:58 2015/03/18 15:06:58 16 s 288/28810.6 MB 341 window at SimpleApp.scala:512015/03/18 15:06:52 6s 288/288 12.3 MB 10.6 MB And part of the driver log: 15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 1426662996000 ms 15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1426662932000 ms) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 100515, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 100509) in 370 ms on lvs02 (75/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 100516, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 100511) in 261 ms on lvs02 (76/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 100517, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 100512) in 274 ms on lvs02 (77/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 100518, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 100508) in 569 ms on lvs02 (78/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 100519, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 100510) in 539 ms on lvs02 (79/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 100520, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 100514) in 296 ms on lvs02 (80/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 100521, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 100515) in 292 ms on lvs02 (81/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 100522, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 100516) in 331 ms on lvs02 (82/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 100523, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 100517) in 271 ms on lvs02 (83/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 100524, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 100513) in 549 ms on lvs02 (84/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 100525, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 100518) in 327 ms on lvs02 (85/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 100526, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 100520) in 293 ms on lvs02 (86/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 100527, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 100521) in 257 ms on lvs02 (87/291) 15/03/18 15:16:36 INFO TaskSetManager:
RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first? From: Todd Nist [mailto:tsind...@gmail.com] Sent: Thursday, March 19, 2015 7:49 AM To: user@spark.apache.org Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /*
saving or visualizing PCA
Hi , I am generating PCA using spark . But I dont know how to save it to disk or visualize it. Can some one give me some pointerspl. Thanks -Roni
RE: saveAsTable fails to save RDD in Spark SQL 1.3.0
Sun, Just want to confirm that it was in fact an authentication issue. The issue is resolved now and I can see my tables through Simba ODBC driver. Thanks a lot. Shahdad From: fightf...@163.com [mailto:fightf...@163.com] Sent: March-17-15 6:33 PM To: Shahdad Moradi; user Subject: Re: saveAsTable fails to save RDD in Spark SQL 1.3.0 Looks like some authentification issues. Can you check that your current user had authority to operate (maybe r/w/x) on /user/hive/warehouse? Thanks, Sun. fightf...@163.commailto:fightf...@163.com From: smoradimailto:smor...@currenex.com Date: 2015-03-18 09:24 To: usermailto:user@spark.apache.org Subject: saveAsTable fails to save RDD in Spark SQL 1.3.0 Hi, Basically my goal is to make the Spark SQL RDDs available to Tableau software through Simba ODBC driver. I’m running standalone Spark 1.3.0 on Ubuntu 14.04. Got the source code and complied it with maven. Hive is also setup and connected to mysql all on a the same machine. The hive-site.xml file has been copied to spark/conf. Here is the content of the hive-site.xml: configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:MySql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionuser name for connecting to mysql server /description /property property namejavax.jdo.option.ConnectionPassword/name valuehivepassword/value descriptionpassword for connecting to mysql server /description /property /configuration Both hive and mysql work just fine. I can create a table with Hive and find it in mysql. The thriftserver is also configured and connected to the spark master. Everything works just fine and I can monitor all the workers and running applications through spark master UI. I have a very simple python script to convert a json file to an RDD like this: import json def transform(data): ts = data[:25].strip() jss = data[41:].strip() jsj = json.loads(jss) jsj['ts'] = ts return json.dumps(jsj) from pyspark.sql import HiveContext sqlContext = HiveContext(sc) rdd = sc.textFile(myfile) tbl = sqlContext.jsonRDD(rdd.map(transform)) tbl.saveAsTable(neworder) the saveAsTable fails with this: 15/03/17 17:22:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool Traceback (most recent call last): File stdin, line 1, in module File /opt/spark/python/pyspark/sql/dataframe.py, line 191, in saveAsTable self._jdf.saveAsTable(tableName, source, jmode, joptions) File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o31.saveAsTable. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/user/hive/warehouse/neworder/_temporary/0/task_201503171618_0008_r_01/part-r-2.parquet; isDirectory=false; length=5591; replication=1; blocksize=33554432; modification_time=142663430; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/user/hive/warehouse/neworder/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:649) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:126) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:217) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65) at
Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the quick response. The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup: radtech$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.out Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080 15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM, HUP, INT]15/03/18 20:31:40 INFO SecurityManager: Changing view acls to: tnist15/03/18 20:31:40 INFO SecurityManager: Changing modify acls to: tnist15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tnist); users with modify permissions: Set(tnist)15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started15/03/18 20:31:41 INFO Remoting: Starting remoting15/03/18 20:31:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' on port 7077.15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:707715/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' on port 8080.15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:808015/03/18 20:31:41 INFO Master: I have been elected leader! New state: ALIVE My build.sbt for the spark job is as follows: import AssemblyKeys._ // activating assembly plugin assemblySettings name := elasticsearch-spark version := 0.0.1 val SCALA_VERSION = 2.10.4val SPARK_VERSION = 1.2.1 val defaultSettings = Defaults.coreDefaultSettings ++ Seq( organization := io.radtec, scalaVersion := SCALA_VERSION, resolvers := Seq( //ods-repo at http://artifactory.ods:8082/artifactory/repo;, Resolver.typesafeRepo(releases)), scalacOptions ++= Seq( -unchecked, -deprecation, -Xlint, -Ywarn-dead-code, -language:_, -target:jvm-1.7, -encoding, UTF-8 ), parallelExecution in Test := false, testOptions += Tests.Argument(TestFrameworks.JUnit, -v), publishArtifact in (Test, packageBin) := true, unmanagedSourceDirectories in Compile = (scalaSource in Compile)(Seq(_)), unmanagedSourceDirectories in Test = (scalaSource in Test)(Seq(_)), EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource, credentials += Credentials(Path.userHome / .ivy2 / .credentials), publishTo := Some(Artifactory Realm at http://artifactory.ods:8082/artifactory/ivy-repo-local;) ) // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar val hadoopDeps = Seq ( org.apache.hadoop % hadoop-client % 2.6.0 % provided ) // ElasticSearch Hadoop support val esHadoopDeps = Seq ( (org.elasticsearch % elasticsearch-hadoop % 2.1.0.BUILD-SNAPSHOT). exclude(org.apache.spark, spark-core_2.10). exclude(org.apache.spark, spark-streaming_2.10). exclude(org.apache.spark, spark-sql_2.10). exclude(javax.jms, jms) ) val commonDeps = Seq( com.eaio.uuid % uuid % 3.2, joda-time % joda-time % 2.3, org.joda % joda-convert % 1.6 ) val jsonDeps = Seq( com.googlecode.json-simple% json-simple % 1.1.1, com.fasterxml.jackson.core% jackson-core % 2.5.1, com.fasterxml.jackson.core% jackson-annotations % 2.5.1, com.fasterxml.jackson.core% jackson-databind % 2.5.1, com.fasterxml.jackson.module % jackson-module-jaxb-annotations % 2.5.1, com.fasterxml.jackson.module %% jackson-module-scala % 2.5.1, com.fasterxml.jackson.dataformat % jackson-dataformat-xml % 2.5.1, com.fasterxml.jackson.datatype% jackson-datatype-joda % 2.5.1 ) val commonTestDeps = Seq( org.specs2 %% specs2 % 2.3.11 % test, org.mockito % mockito-all % 1.9.5 % test, org.scalacheck %% scalacheck % 1.11.3 % test, org.scalatest%% scalatest% 1.9.1 % test) // Project definitions lazy val root = (project in file(.)) .settings(defaultSettings:_*) .settings(libraryDependencies ++= Seq(
Apache Spark User List: people's responses not showing in the browser view
Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [spark-streaming] can shuffle write to disk be disabled?
Please see the inline comments. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 9:30 PM To: Shao, Saisai Cc: user@spark.apache.org; Akhil Das Subject: Re: [spark-streaming] can shuffle write to disk be disabled? On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? This is the internal mechanism for Spark. As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? You can monitor the system metrics, as well as JVM, also information from web UI is very useful. Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
[SQL] Elasticsearch-hadoop, exception creating temporary table
I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /* * Read from ES and query with with Spark SparkSQL */ val esData = sc.esRDD(s${ElasticSearch.Index}/${ElasticSearch.Type}) esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(sTotal time: ${end-start} ms) This works fine and and prints the content of esData out as one would expect. 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s (4,Map(employer - Tourmania, city - Eastvale, address - 986
Re: mapPartitions - How Does it Works
Unlike a map() wherein your task is acting on a row at a time, with mapPartitions(), the task is passed the entire content of the partition in an iterator. You can then return back another iterator as the output. I don't do scala, but from what I understand from your code snippet... The iterator x can return all the rows in the partition. But you are returning back after consuming the first row. Hence you see only 1,4,7 in your output. These are the first rows of each of your 3 partitions. Regards Sab On 18-Mar-2015 10:50 pm, ashish.usoni ashish.us...@gmail.com wrote: I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does-it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
iPython Notebook + Spark + Accumulo -- best practice?
hi all, I've been DDGing, Stack Overflowing, Twittering, RTFMing, and scanning through this archive with only moderate success. in other words -- my way of saying sorry if this is answered somewhere obvious and I missed it :-) i've been tasked with figuring out how to connect Notebook, Spark, and Accumulo together. The end user will do her work via notebook. thus far, I've successfully setup a Vagrant image containing Spark, Accumulo, and Hadoop. I was able to use some of the Accumulo example code to create a table populated with data, create a simple program in scala that, when fired off to Spark via spark-submit, connects to accumulo and prints the first ten rows of data in the table. so w00t on that - but now I'm left with more questions: 1) I'm still stuck on what's considered 'best practice' in terms of hooking all this together. Let's say Sally, a user, wants to do some analytic work on her data. She pecks the appropriate commands into notebook and fires them off. how does this get wired together on the back end? Do I, from notebook, use spark-submit to send a job to spark and let spark worry about hooking into accumulo or is it preferable to create some kind of open stream between the two? 2) if I want to extend spark's api, do I need to first submit an endless job via spark-submit that does something like what this gentleman describes http://blog.madhukaraphatak.com/extending-spark-api ? is there an alternative (other than refactoring spark's source) that doesn't involve extending the api via a job submission? ultimately what I'm looking for help locating docs, blogs, etc that may shed some light on this. t/y in advance! d -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iPython-Notebook-Spark-Accumulo-best-practice-tp22137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iPython Notebook + Spark + Accumulo -- best practice?
Hi David, W00t indeed and great questions. On the notebook front, there are two options depending on what you are looking for. You can either go with iPython 3 with Spark-kernel as a backend or you can use spark-notebook. Both have interesting tradeoffs. If you have looking for a single notebook platform for your data scientists that has R, Python as well as a Spark Shell, you'll likely want to go with iPython + Spark-kernel. Downsides with the spark-kernel project are that data visualization isn't quite there yet, early days for documentation and blogs/etc. Upside is that R and Python work beautifully and that the ipython committers are super-helpful. If you are OK with a primarily spark/scala experience, then I suggest you with spark-notebook. Upsides are that the project is a little further along, visualization support is better than spark-kernel (though not as good as iPython with Python) and the committer is awesome with help. Downside is that you won't get R and Python. FWIW: I'm using both at the moment! Hope that helps. *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 5:45 PM, davidh dav...@annaisystems.com wrote: hi all, I've been DDGing, Stack Overflowing, Twittering, RTFMing, and scanning through this archive with only moderate success. in other words -- my way of saying sorry if this is answered somewhere obvious and I missed it :-) i've been tasked with figuring out how to connect Notebook, Spark, and Accumulo together. The end user will do her work via notebook. thus far, I've successfully setup a Vagrant image containing Spark, Accumulo, and Hadoop. I was able to use some of the Accumulo example code to create a table populated with data, create a simple program in scala that, when fired off to Spark via spark-submit, connects to accumulo and prints the first ten rows of data in the table. so w00t on that - but now I'm left with more questions: 1) I'm still stuck on what's considered 'best practice' in terms of hooking all this together. Let's say Sally, a user, wants to do some analytic work on her data. She pecks the appropriate commands into notebook and fires them off. how does this get wired together on the back end? Do I, from notebook, use spark-submit to send a job to spark and let spark worry about hooking into accumulo or is it preferable to create some kind of open stream between the two? 2) if I want to extend spark's api, do I need to first submit an endless job via spark-submit that does something like what this gentleman describes http://blog.madhukaraphatak.com/extending-spark-api ? is there an alternative (other than refactoring spark's source) that doesn't involve extending the api via a job submission? ultimately what I'm looking for help locating docs, blogs, etc that may shed some light on this. t/y in advance! d -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iPython-Notebook-Spark-Accumulo-best-practice-tp22137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL 1.3.0 JDBC data source issues
Hi, I am trying jdbc data source in spark sql 1.3.0 and found some issues. First, the syntax where str_col='value' will give error for both postgresql and mysql: psql create table foo(id int primary key,name text,age int); bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect org.postgresql.util.PSQLException: ERROR: operator does not exist: text = bar Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts. Position: 40 scala sql(select * from foo where name like '%foo').collect bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'bar' in 'where clause' Second, postgresql table with json data type does not work: psql create table foo(id int primary key, data json); scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) java.sql.SQLException: Unsupported type Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if needed. Thanks, -- Pei-Lun
Re: MEMORY_ONLY vs MEMORY_AND_DISK
It depends. If the data size on which the calculation is to be done is very large than caching it with MEMORY_AND_DISK is useful. Even in this case MEMORY_AND_DISK is useful if the computation on the RDD is expensive. If the compution is very small than even for large data sets MEMORY_ONLY can be used. But if data size is small, than using MEMORY_ONLY is a obviously the best option. On Thu, Mar 19, 2015 at 2:35 AM, sergunok [via Apache Spark User List] ml-node+s1001560n22130...@n3.nabble.com wrote: What persistance level is better if RDD to be cached is heavily to be recalculated? Am I right it is MEMORY_AND_DISK? -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/MEMORY-ONLY-vs-MEMORY-AND-DISK-tp22130.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MEMORY-ONLY-vs-MEMORY-AND-DISK-tp22130p22140.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Thanks for clarifying Todd. This may then be an issue specific to the HDP version we're using. Will continue to debug and post back if there's any resolution. On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote: Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Spark + HBase + Kerberos
Are hbase config / keytab files deployed on executor machines ? Consider adding -Dsun.security.krb5.debug=true for debug purpose. Cheers On Wed, Mar 18, 2015 at 11:39 AM, Eric Walk eric.w...@perficient.com wrote: Having an issue connecting to HBase from a Spark container in a Secure Cluster. Haven’t been able to get past this issue, any thoughts would be appreciated. We’re able to perform some operations like “CreateTable” in the driver thread successfully. Read requests (always in the executor threads) are always failing with: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Logs and scala are attached, the names of the innocent have masked for their protection (in a consistent manner). Executing the following spark job (using HDP 2.2, Spark 1.2.0, HBase 0.98.4, Kerberos on AD): export SPARK_CLASSPATH=/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-server.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-protocol.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-hadoop2-compat.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-client.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/hbase-common.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/htrace-core-3.0.4.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/guava-12.0.1.jar:/usr/hdp/2.2.0.0-2041/hbase/conf /usr/hdp/2.2.0.0-2041/spark/bin/spark-submit --class HBaseTest --driver-memory 2g --executor-memory 1g --executor-cores 1 --num-executors 1 --master yarn-client ~/spark-test_2.10-1.0.jar We see this error in the executor processes (attached as yarn log.txt): 2015-03-18 17:34:15,121 DEBUG [Executor task launch worker-0] security.HBaseSaslRpcClient: Creating SASL GSSAPI client. Server's Kerberos principal name is hbase/ldevawshdp0002.dc1.pvc@dc1.PVC 2015-03-18 17:34:15,128 WARN [Executor task launch worker-0] ipc.RpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 2015-03-18 17:34:15,129 ERROR [Executor task launch worker-0] ipc.RpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] The HBase Master Logs show success: 2015-03-18 17:34:12,861 DEBUG [RpcServer.listener,port=6] ipc.RpcServer: RpcServer.listener,port=6: connection from 10.4.0.6:46636; # active connections: 3 2015-03-18 17:34:12,872 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Kerberos principal name is hbase/ldevawshdp0001.dc1.pvc@ DC1.PVC 2015-03-18 17:34:12,875 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Created SASL server with mechanism = GSSAPI 2015-03-18 17:34:12,875 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 1501 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,876 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Will send token of size 108 from saslServer. 2015-03-18 17:34:12,877 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 0 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,878 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Will send token of size 32 from saslServer. 2015-03-18 17:34:12,878 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: Have read input token of size 32 for processing by saslServer.evaluateResponse() 2015-03-18 17:34:12,879 DEBUG [RpcServer.reader=3,port=6] security.HBaseSaslRpcServer: SASL server GSSAPI callback: setting canonicalized client ID: user1@DC1.PVC 2015-03-18 17:34:12,895 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: SASL server context established. Authenticated client: user1@DC1.PVC (auth:SIMPLE). Negotiated QoP is auth 2015-03-18 17:34:29,313 DEBUG [RpcServer.reader=3,port=6] ipc.RpcServer: RpcServer.listener,port=6: DISCONNECTING client 10.4.0.6:46636 because read count=-1. Number of active connections: 3 2015-03-18 17:34:37,102 DEBUG [RpcServer.listener,port=6] ipc.RpcServer: RpcServer.listener,port=6: connection from 10.4.0.6:46733; # active connections: 3 2015-03-18 17:34:37,102 DEBUG [RpcServer.reader=4,port=6] ipc.RpcServer: RpcServer.listener,port=6: DISCONNECTING client 10.4.0.6:46733 because read count=-1. Number of active connections: 3 The Spark Driver Console Output hangs at this point: 2015-03-18 17:34:13,337 INFO [main] spark.DefaultExecutionContext: Starting job: count at HBaseTest.scala:63 2015-03-18 17:34:13,349 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.DAGScheduler: Got job 0 (count at HBaseTest.scala:63) with 1 output partitions (allowLocal=false) 2015-03-18 17:34:13,350 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.DAGScheduler: Final
RDD pair to pair of RDDs
What's the best way to go from: RDD[(A, B)] to (RDD[A], RDD[B]) If I do: def separate[A, B](k: RDD[(A, B)]) = (k.map(_._1), k.map(_._2)) Which is the obvious solution, this runs two maps in the cluster. Can I do some kind of a fold instead: def separate[A, B](l: List[(A, B)]) = l.foldLeft(List[A](), List[B]())((a, b) = (b._1 :: a._1, b._2 :: a._2)) But obviously this has an aggregate component that I don't want to be running on the driver right? Thanks, Alex
Re: Column Similarity using DIMSUM
Hi Manish, Did you try calling columnSimilarities(threshold) with different threshold values? You try threshold values of 0.1, 0.5, 1, and 20, and higher. Best, Reza On Wed, Mar 18, 2015 at 10:40 AM, Manish Gupta 8 mgupt...@sapient.com wrote: Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring (all 2%), then job runs in a lightning fast manner (even for 100 Entities X 1 attributes) and results are very accurate. I am running Spark 1.2.0-cdh5.3.0 on 11 node cluster each having 4 cores and 16GB of RAM. My question is - *Is this behavior expected for datasets where some Attributes frequently occur*? Thanks, Manish Gupta
[Spark SQL] Elasticsearch-hadoop - exception when creating Temporary table
I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /* * Read from ES and query with with Spark SparkSQL */ val esData = sc.esRDD(s${ElasticSearch.Index}/${ElasticSearch.Type}) esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(sTotal time: ${end-start} ms) This works fine and and prints the content of esData out as one would expect. 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s (4,Map(employer - Tourmania, city - Eastvale, address - 986
Using a different spark jars than the one on the cluster
Hi all, I am trying to run my job which needs spark-sql_2.11-1.3.0.jar. The cluster that I am running on is still on spark-1.2.0. I tried the following : spark-submit --class class-name --num-executors 100 --master yarn application_jar--jars hdfs:///path/spark-sql_2.11-1.3.0.jar hdfs:///input_data But, this did not work, I get an error that it is not able to find a class/method that is in spark-sql_2.11-1.3.0.jar . org.apache.spark.sql.SQLContext.implicits()Lorg/apache/spark/sql/SQLContext$implicits$ The question in general is how do we use a different version of spark jars (spark-core, spark-sql, spark-ml etc) than the one's running on a cluster ? Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-different-spark-jars-than-the-one-on-the-cluster-tp22125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming S3 Performance Implications
Hi All, I am pushing data from Kinesis stream to S3 using Spark Streaming and noticed that during testing (i.e. master=local[2]) the batches (1 second intervals) were falling behind the incoming data stream at about 5-10 events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking at a few seconds to complete. val saveFunc = (rdd: RDD[String], time: Time) = { val count = rdd.count() if (count 0) { val s3BucketInterval = time.milliseconds.toString rdd.saveAsTextFile(s3n://...) } } dataStream.foreachRDD(saveFunc) Should I expect the same behaviour in a deployed cluster? Or does the rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node? Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. Thanks, Mike.
Re: StorageLevel: OFF_HEAP
I just tested with Spark-1.3.0 + Tachyon-0.6.0 and still see the same issue. Here are the logs: 15/03/18 11:44:11 ERROR : Invalid method name: 'getDataFolder' 15/03/18 11:44:11 ERROR : Invalid method name: 'user_getFileId' 15/03/18 11:44:11 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-e3538a20-5e42-48a4-ad67-4b97aded90e4/driver Thanks for any other pointers. - Ranga On Wed, Mar 18, 2015 at 9:53 AM, Ranga sra...@gmail.com wrote: Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Hey Dimitriy, thanks for sharing your solution. I have some more updates. The problem comes out when shuffle is involved. Using coalesce shuffle true behaves like reduceByKey+smaller num of partitions, except that the whole save stage hangs. I am not sure yet if it only happens with UnionRDD or also for cogroup like. Changing spark.shuffle.blockTransferService to use nio (default pre 1.2) solves the problem. So it looks like this problem arises with the new netty based impl. 2015-03-18 1:26 GMT+01:00 Dmitriy Lyubimov dlie...@gmail.com: FWIW observed similar behavior in similar situation. Was able to work around by forcefully committing one of the rdds right before the union into cache, and forcing that by executing take(1). Nothing else ever helped. Seems like yet-undiscovered 1.2.x thing. On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... The job takes as input two data sets: - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. The job basically does this: A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. Configs I used to run this job: storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. What happens: The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. Some more infos and logs: Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. An executor from node ip-10-182-98-220 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out An executor from node ip-10-181-103-186 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out An executor from node ip-10-181-48-153 (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out Followed by many 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result
Spark and Morphlines, parallelization, multithreading
Still a Spark noob grappling with the concepts... I'm trying to grok the idea of integrating something like the Morphlines pipelining library with Spark (or SparkStreaming). The Kite/Morphlines doc states that runtime executes all commands of a given morphline in the same thread... there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads. Further: There is no need for a morphline to manage multiple processes, nodes, or threads because this is already addressed by host systems such as MapReduce, Flume, Spark or Storm. My question is, how exactly does Spark manage parallelization and multi-treading aspects of RDD processing? As I understand it, each collection of data is split into partitions and each partition is sent over to a slave machine to perform computations. So, for each data partition, how many processes are created? And for each process, how many threads? Knowing that would help me understand how to structure the following: JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaDStreamString messageBodies = messages.map(new FunctionTuple2lt;String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); Would I want to create a morphline in a 'messages.foreachRDD' block? then invoke the morphline on each messageBody? What will Spark be doing behind the scenes as far as multiple processes and multiple threads? Should I rely on it to optimize performance with multiple threads and not worry about plugging in a multi-threaded pipelining engine? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Morphlines-parallelization-multithreading-tp22134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StorageLevel: OFF_HEAP
Did you recompile it with Tachyon 0.6.0? Also, Tachyon 0.6.1 has been released this morning: http://tachyon-project.org/ ; https://github.com/amplab/tachyon/releases Best regards, Haoyuan On Wed, Mar 18, 2015 at 11:48 AM, Ranga sra...@gmail.com wrote: I just tested with Spark-1.3.0 + Tachyon-0.6.0 and still see the same issue. Here are the logs: 15/03/18 11:44:11 ERROR : Invalid method name: 'getDataFolder' 15/03/18 11:44:11 ERROR : Invalid method name: 'user_getFileId' 15/03/18 11:44:11 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-e3538a20-5e42-48a4-ad67-4b97aded90e4/driver Thanks for any other pointers. - Ranga On Wed, Mar 18, 2015 at 9:53 AM, Ranga sra...@gmail.com wrote: Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: Using a different spark jars than the one on the cluster
Since you're using YARN, you should be able to download a Spark 1.3.0 tarball from Spark's website and use spark-submit from that installation to launch your app against the YARN cluster. So effectively you would have 1.2.0 and 1.3.0 side-by-side in your cluster. On Wed, Mar 18, 2015 at 11:09 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to run my job which needs spark-sql_2.11-1.3.0.jar. The cluster that I am running on is still on spark-1.2.0. I tried the following : spark-submit --class class-name --num-executors 100 --master yarn application_jar--jars hdfs:///path/spark-sql_2.11-1.3.0.jar hdfs:///input_data But, this did not work, I get an error that it is not able to find a class/method that is in spark-sql_2.11-1.3.0.jar . org.apache.spark.sql.SQLContext.implicits()Lorg/apache/spark/sql/SQLContext$implicits$ The question in general is how do we use a different version of spark jars (spark-core, spark-sql, spark-ml etc) than the one's running on a cluster ? Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-different-spark-jars-than-the-one-on-the-cluster-tp22125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StorageLevel: OFF_HEAP
Thanks Ted. Will do. On Wed, Mar 18, 2015 at 2:27 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Please apply the patch from: https://github.com/apache/spark/pull/4867 And rebuild Spark - the build would use Tachyon-0.6.1 Cheers On Wed, Mar 18, 2015 at 2:23 PM, Ranga sra...@gmail.com wrote: Hi Haoyuan No. I assumed that Spark-1.3.0 was already built with Tachyon-0.6.0. If not, I can rebuild and try. Could you let me know how to rebuild with 0.6.0? Thanks for your help. - Ranga On Wed, Mar 18, 2015 at 12:59 PM, Haoyuan Li haoyuan...@gmail.com wrote: Did you recompile it with Tachyon 0.6.0? Also, Tachyon 0.6.1 has been released this morning: http://tachyon-project.org/ ; https://github.com/amplab/tachyon/releases Best regards, Haoyuan On Wed, Mar 18, 2015 at 11:48 AM, Ranga sra...@gmail.com wrote: I just tested with Spark-1.3.0 + Tachyon-0.6.0 and still see the same issue. Here are the logs: 15/03/18 11:44:11 ERROR : Invalid method name: 'getDataFolder' 15/03/18 11:44:11 ERROR : Invalid method name: 'user_getFileId' 15/03/18 11:44:11 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-e3538a20-5e42-48a4-ad67-4b97aded90e4/driver Thanks for any other pointers. - Ranga On Wed, Mar 18, 2015 at 9:53 AM, Ranga sra...@gmail.com wrote: Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
RDD ordering after map
Does map(...) preserve ordering of original RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-ordering-after-map-tp22129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MEMORY_ONLY vs MEMORY_AND_DISK
What persistance level is better if RDD to be cached is heavily to be recalculated? Am I right it is MEMORY_AND_DISK? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MEMORY-ONLY-vs-MEMORY-AND-DISK-tp22130.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StorageLevel: OFF_HEAP
Hi Haoyuan No. I assumed that Spark-1.3.0 was already built with Tachyon-0.6.0. If not, I can rebuild and try. Could you let me know how to rebuild with 0.6.0? Thanks for your help. - Ranga On Wed, Mar 18, 2015 at 12:59 PM, Haoyuan Li haoyuan...@gmail.com wrote: Did you recompile it with Tachyon 0.6.0? Also, Tachyon 0.6.1 has been released this morning: http://tachyon-project.org/ ; https://github.com/amplab/tachyon/releases Best regards, Haoyuan On Wed, Mar 18, 2015 at 11:48 AM, Ranga sra...@gmail.com wrote: I just tested with Spark-1.3.0 + Tachyon-0.6.0 and still see the same issue. Here are the logs: 15/03/18 11:44:11 ERROR : Invalid method name: 'getDataFolder' 15/03/18 11:44:11 ERROR : Invalid method name: 'user_getFileId' 15/03/18 11:44:11 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-e3538a20-5e42-48a4-ad67-4b97aded90e4/driver Thanks for any other pointers. - Ranga On Wed, Mar 18, 2015 at 9:53 AM, Ranga sra...@gmail.com wrote: Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: StorageLevel: OFF_HEAP
Ranga: Please apply the patch from: https://github.com/apache/spark/pull/4867 And rebuild Spark - the build would use Tachyon-0.6.1 Cheers On Wed, Mar 18, 2015 at 2:23 PM, Ranga sra...@gmail.com wrote: Hi Haoyuan No. I assumed that Spark-1.3.0 was already built with Tachyon-0.6.0. If not, I can rebuild and try. Could you let me know how to rebuild with 0.6.0? Thanks for your help. - Ranga On Wed, Mar 18, 2015 at 12:59 PM, Haoyuan Li haoyuan...@gmail.com wrote: Did you recompile it with Tachyon 0.6.0? Also, Tachyon 0.6.1 has been released this morning: http://tachyon-project.org/ ; https://github.com/amplab/tachyon/releases Best regards, Haoyuan On Wed, Mar 18, 2015 at 11:48 AM, Ranga sra...@gmail.com wrote: I just tested with Spark-1.3.0 + Tachyon-0.6.0 and still see the same issue. Here are the logs: 15/03/18 11:44:11 ERROR : Invalid method name: 'getDataFolder' 15/03/18 11:44:11 ERROR : Invalid method name: 'user_getFileId' 15/03/18 11:44:11 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-e3538a20-5e42-48a4-ad67-4b97aded90e4/driver Thanks for any other pointers. - Ranga On Wed, Mar 18, 2015 at 9:53 AM, Ranga sra...@gmail.com wrote: Thanks for the information. Will rebuild with 0.6.0 till the patch is merged. On Tue, Mar 17, 2015 at 7:24 PM, Ted Yu yuzhih...@gmail.com wrote: Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: RDD ordering after map
Hi, Yes, ordering is preserved with map. Shuffles break ordering. Burak On Wed, Mar 18, 2015 at 2:02 PM, sergunok ser...@gmail.com wrote: Does map(...) preserve ordering of original RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-ordering-after-map-tp22129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does newly-released LDA (Latent Dirichlet Allocation) algorithm supports ngrams?
I wonder to know whether the newly-released LDA (Latent Dirichlet Allocation) algorithm only supports uni-gram or it can also supports bi/tri-grams too? If it can, can someone help me how I can use them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-newly-released-LDA-Latent-Dirichlet-Allocation-algorithm-supports-ngrams-tp22131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org