Re: Akka Connection refused - standalone cluster using spark-0.9.0
Same here, got stuck at this point. Any hints on what might be going on? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Akka-Connection-refused-standalone-cluster-using-spark-0-9-0-tp1297p6463.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Problem using Spark with Hbase
Hi all, I am facing issues while using spark with HBase. I am getting NullPointerException at org.apache.hadoop.hbase.TableName.valueOf (TableName.java:288) Can someone please help to resolve this issue. What am I missing ? I am using following snippet of code - Configuration config = HBaseConfiguration.create(); config.set(hbase.zookeeper.znode.parent, hostname1); config.set(hbase.zookeeper.quorum,hostname1); config.set(hbase.zookeeper.property.clientPort,2181); config.set(hbase.master, hostname1: config.set(fs.defaultFS,hdfs://hostname1/); config.set(dfs.namenode.rpc-address,hostname1:8020); config.set(TableInputFormat.INPUT_TABLE, tableName); JavaSparkContext ctx = new JavaSparkContext(args[0], Simple, System.getenv(sparkHome), JavaSparkContext.jarOfClass(Simple.class)); JavaPairRDDImmutableBytesWritable, Result hBaseRDD = ctx.newAPIHadoopRDD( config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); MapImmutableBytesWritable, Result rddMap = hBaseRDD.collectAsMap(); But when I go to the spark cluster and check the logs, I see following error - INFO NewHadoopRDD: Input split: w3-target1.nm.flipkart.com:, 14/05/28 16:48:51 ERROR TableInputFormat: java.lang.NullPointerException at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:288) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:154) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:99) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:92) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks, -Vibhor
Inter and Inra Cluster Density in KMeans
Hi, I wanted to calculate the InterClusterDensity and IntraClusterDensity from the clusters generated from KMeans. How can I achieve that? Is there any already present code/api to use for this purpose. Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: Akka Connection refused - standalone cluster using spark-0.9.0
I've been playing with the amplab docker scripts and I needed to set spark.driver.host to the driver host ip. One that all spark processes can get to. On May 28, 2014, at 4:35 AM, jaranda jordi.ara...@bsc.es wrote: Same here, got stuck at this point. Any hints on what might be going on? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Akka-Connection-refused-standalone-cluster-using-spark-0-9-0-tp1297p6463.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Writing RDDs from Python Spark progrma (pyspark) to HBase
It's not possible currently to write anything other than text (or pickle files I think in 1.0.0 or if not then in 1.0.1) from PySpark. I have an outstanding pull request to add READING any InputFormat from PySpark, and after that is in I will look into OutputFormat too. What does your data look like? Any details about your use case that you could share would aid the design of this feature. N On Wed, May 28, 2014 at 3:00 PM, gaurav.dasgupta gaurav.d...@gmail.comwrote: Hi, I am unable to understand how to write data directly on HBase table from a Spark (pyspark) Python program. Is this possible in the current Spark releases? If so, can someone provide an example code snippet to do this? Thanks in advance. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-RDDs-from-Python-Spark-progrma-pyspark-to-HBase-tp6469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reading bz2 files that do not end with .bz2
Hi, I have a bunch of files that are bz2 compressed but do not have the extension .bz2 Is there anyway to force spark to read them as bz2 files using sc.textFile ? FYI, if i add the .bz2 extension to the file it works fine but the process that creates those files can't do that and i'd like to find another way to make this work than renaming all the files before executing my Spark job. Thanks Regards Laurent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-bz2-files-that-do-not-end-with-bz2-tp6473.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading bz2 files that do not end with .bz2
You can use Hadoop APi provide input/output reader hadoop configuration file to read the data. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, May 28, 2014 at 7:22 PM, Laurent T laurent.thou...@ldmobile.netwrote: Hi, I have a bunch of files that are bz2 compressed but do not have the extension .bz2 Is there anyway to force spark to read them as bz2 files using sc.textFile ? FYI, if i add the .bz2 extension to the file it works fine but the process that creates those files can't do that and i'd like to find another way to make this work than renaming all the files before executing my Spark job. Thanks Regards Laurent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-bz2-files-that-do-not-end-with-bz2-tp6473.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem using Spark with Hbase
Any one who has used spark this way or has faced similar issue, please help. Thanks, -Vibhor On Wed, May 28, 2014 at 6:03 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am facing issues while using spark with HBase. I am getting NullPointerException at org.apache.hadoop.hbase.TableName.valueOf (TableName.java:288) Can someone please help to resolve this issue. What am I missing ? I am using following snippet of code - Configuration config = HBaseConfiguration.create(); config.set(hbase.zookeeper.znode.parent, hostname1); config.set(hbase.zookeeper.quorum,hostname1); config.set(hbase.zookeeper.property.clientPort,2181); config.set(hbase.master, hostname1: config.set(fs.defaultFS,hdfs://hostname1/); config.set(dfs.namenode.rpc-address,hostname1:8020); config.set(TableInputFormat.INPUT_TABLE, tableName); JavaSparkContext ctx = new JavaSparkContext(args[0], Simple, System.getenv(sparkHome), JavaSparkContext.jarOfClass(Simple.class)); JavaPairRDDImmutableBytesWritable, Result hBaseRDD = ctx.newAPIHadoopRDD( config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); MapImmutableBytesWritable, Result rddMap = hBaseRDD.collectAsMap(); But when I go to the spark cluster and check the logs, I see following error - INFO NewHadoopRDD: Input split: w3-target1.nm.flipkart.com:, 14/05/28 16:48:51 ERROR TableInputFormat: java.lang.NullPointerException at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:288) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:154) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:99) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:92) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks, -Vibhor
RE: GraphX partition problem
Hi Ankur, We’ve built it from the git link you’ve sent, and we don’t get the exception anymore. However, we’ve been facing strange indeterministic behavior from Graphx. We compute connected components on a graph of ~900K edges. We ran the spark job several times on the same input graph and got back different components each time. Furthermore, we construct the graph from an edge list, therefore there should not be “singleton” components. In the output we see that the vast majority (like 80%) of the components have only single vertex. Does that have something to do with the bugfix below? Can you advise on how to solve this issue? Thanks, Alex From: Ankur Dave [mailto:ankurd...@gmail.com] Sent: Thursday, May 22, 2014 6:59 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: GraphX partition problem The fix will be included in Spark 1.0, but if you just want to apply the fix to 0.9.1, here's a hotfixed version of 0.9.1 that only includes PR #367: https://github.com/ankurdave/spark/tree/v0.9.1-handle-empty-partitions. You can clone and build this. Ankurhttp://www.ankurdave.com/ On Thu, May 22, 2014 at 4:53 AM, Zhicharevich, Alex azhicharev...@ebay.commailto:azhicharev...@ebay.com wrote: Hi, I’m running a simple connected components code using GraphX (version 0.9.1) My input comes from a HDFS text file partitioned to 400 parts. When I run the code on a single part or a small number of files (like 20) the code runs fine. As soon as I’m trying to read more files (more than 30) I’m getting an error and the job fails. From looking at the logs I see the following exception java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:52) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:51) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:456) From searching the web, I see it’s a known issue with GraphX Here : https://github.com/apache/spark/pull/367 And here : https://github.com/apache/spark/pull/497 Are there some stable releases that include this fix? Should I clone the git repo and build it myself? How would you advise me to deal with this issue Thanks, Alex
Re: Comprehensive Port Configuration reference?
Howdy Andrew, Here is what I ran before an application context was created (other services have been deleted): # netstat -l -t tcp -p --numeric-ports Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 10.90.17.100: :::* LISTEN 4762/java tcp6 0 0 :::8081 :::* LISTEN 4762/java And, then while the application context is up: # netstat -l -t tcp -p --numeric-ports Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 10.90.17.100: :::* LISTEN 4762/java tcp6 0 0 :::57286:::* LISTEN 3404/java tcp6 0 0 10.90.17.100:38118 :::* LISTEN 3404/java tcp6 0 0 10.90.17.100:35530 :::* LISTEN 3404/java tcp6 0 0 :::60235:::* LISTEN 3404/java tcp6 0 0 :::8081 :::* LISTEN 4762/java My understanding is that this says four ports are open. Is 57286 and 60235 not being used? Jacob Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 From: Andrew Ash and...@andrewash.com To: user@spark.apache.org Date: 05/25/2014 06:25 PM Subject:Re: Comprehensive Port Configuration reference? Hi Jacob, The config option spark.history.ui.port is new for 1.0 The problem that History server solves is that in non-Standalone cluster deployment modes (Mesos and YARN) there is no long-lived Spark Master that can store logs and statistics about an application after it finishes. History server is the UI that renders logged data from applications after they complete. Read more here: https://issues.apache.org/jira/browse/SPARK-1276 and https://github.com/apache/spark/pull/204 As far as the two vs four dynamic ports, are those all listening ports? I did observe 4 ports in use, but only two of them were listening. The other two were the random ports used for responses on outbound connections, the source port of the (srcIP, srcPort, dstIP, dstPort) tuple that uniquely identifies a TCP socket. http://unix.stackexchange.com/questions/75011/how-does-the-server-find-out-what-client-port-to-send-to Thanks for taking a look through! I also realized that I had a couple mistakes with the 0.9 to 1.0 transition so appropriately documented those now as well in the updated PR. Cheers! Andrew On Fri, May 23, 2014 at 2:43 PM, Jacob Eisinger jeis...@us.ibm.com wrote: Howdy Andrew, I noticed you have a configuration item that we were not aware of: spark.history.ui.port . Is that new for 1.0? Also, we noticed that the Workers and the Drivers were opening up four dynamic ports per application context. It looks like you were seeing two. Everything else looks like it aligns! Jacob Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 Inactive hide details for Andrew Ash ---05/23/2014 10:30:58 AM---Hi everyone, I've also been interested in better understandingAndrew Ash ---05/23/2014 10:30:58 AM---Hi everyone, I've also been interested in better understanding what ports are used where From: Andrew Ash and...@andrewash.com To: user@spark.apache.org Date: 05/23/2014 10:30 AM Subject: Re: Comprehensive Port Configuration reference? Hi everyone, I've also been interested in better understanding what ports are used where and the direction the network connections go. I've observed a running cluster and read through code, and came up with the below documentation addition. https://github.com/apache/spark/pull/856 Scott and Jacob -- it sounds like you two have pulled together some of this yourselves for writing firewall rules. Would you mind taking a look at this pull request and confirming that it matches your observations? Wrong documentation is worse than no documentation, so I'd like to make sure this is right. Cheers, Andrew On Wed, May 7, 2014 at 10:19 AM, Mark Baker dist...@acm.org wrote: On Tue, May 6, 2014 at 9:09 AM, Jacob Eisinger jeis...@us.ibm.com wrote: In a nut shell, Spark opens up a couple of well known ports. And,then the workers and the shell open up dynamic ports for each job. These dynamic ports make securing the Spark network difficult. Indeed. Judging by the frequency with which this topic arises, this is a concern for many (myself included). I couldn't find anything in JIRA about it, but I'm curious to know whether the Spark team considers this a problem in need of a fix? Mark.
Re: Java RDD structure for Matrix predict?
Wisely, is mapToPair in Spark 0.9.1 or 1.0? I'm running the former and didn't see that method available. I think the issue is that predict() is expecting an RDD containing a tuple of ints and not Integers. So if I use JavaPairRDDObject,Object with my original code snippet, things seem to at least compile for now. On Tue, May 27, 2014 at 6:40 PM, giive chen thegi...@gmail.com wrote: Hi Sandeep I think you should use testRatings.mapToPair instead of testRatings.map. So the code should be JavaPairRDDInteger,Integer usersProducts = training.mapToPair( new PairFunctionRating, Integer, Integer() { public Tuple2Integer, Integer call(Rating r) throws Exception { return new Tuple2Integer, Integer(r.user(), r.product()); } } ); It works on my side. Wisely Chen On Wed, May 28, 2014 at 6:27 AM, Sandeep Parikh sand...@clusterbeep.orgwrote: I've got a trained MatrixFactorizationModel via ALS.train(...) and now I'm trying to use it to predict some ratings like so: JavaRDDRating predictions = model.predict(usersProducts.rdd()) Where usersProducts is built from an existing Ratings dataset like so: JavaPairRDDInteger,Integer usersProducts = testRatings.map( new PairFunctionRating, Integer, Integer() { public Tuple2Integer, Integer call(Rating r) throws Exception { return new Tuple2Integer, Integer(r.user(), r.product()); } } ); The problem is that model.predict(...) doesn't like usersProducts, claiming that the method doesn't accept an RDD of type Tuple2 however the docs show the method signature as follows: def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] Am I missing something? The JavaRDD is just a list of Tuple2 elements, which would match the method signature but the compile is complaining. Thanks!
Integration issue between Apache Shark-0.9.1 (with in-house hive-0.11) and pre-existing CDH4.6 HIVE-0.10 server
Hi all, I have installed Apache Shark 0.9.1 on my machine which comes bundled with hive-0.11 version of hive jars.I am trying to integrate this with my pre-existing CDH-4.6 version of the Hive server which is of version 0.10.On pointing HIVE_HOME in spark-env.sh to the cloudera version of the hive (hive-0.10),I am getting the IPC version mismatch error as below when I try to execute any SELECT query on an existing Hive table: shark select * from test_table; 27.828: [Full GC 61270K-21300K(1013632K), 0.1185540 secs] java.lang.RuntimeException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential( TaskRunner.java:57) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362) It seems the Shark-0.9.1 version is incompatible with Hive-0.10 server.Please suggest a solution if anyone has encountered this issue. Thanks Bijoy
Re: rdd ordering gets scrambled
Mohit Jaggi: A workaround is to use zipWithIndex (to appear in Spark 1.0, but if you're still on 0.9x you can swipe the code from https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala ), map it to (x = (x._2,x._1)) and then sortByKey. Spark developers: The lack of ordering guarantee for RDDs should be better documented, and the presence of a method called first() is a bit deceiving, in my opinion, if that same first element doesn't survive a map(). On Tuesday, April 29, 2014 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi, I started with a text file(CSV) of sorted data (by first column), parsed it into Scala objects using map operation in Scala. Then I used more maps to add some extra info to the data and saved it as text file. The final text file is not sorted. What do I need to do to keep the order from the original input intact? My code looks like: csvFile = sc.textFile(..) //file is CSV and ordered by first column splitRdd = csvFile map { line = line.split(,,-1) } parsedRdd = rdd map { parts = { key = parts(0) //use first column as key value = new MyObject(parts(0), parts(1)) //parse into scala objects (key, value) } augmentedRdd = parsedRdd map { x = key = x._1 value = //add extra fields to x._2 (key, value) } augmentedRdd.saveAsFile(...) //this file is not sorted Mohit.
K-NN by efficient sparse matrix product
Hi, I'm new to Spark and Hadoop, and I'd like to know if the following problem is solvable in terms of Spark's primitives. To compute the K-nearest neighbours of a N-dimensional dataset, I can multiply my very large normalized sparse matrix by its transpose. As this yields all pairwise distance values (N x N), I can then sort each row and only keep the K highest elements for each, resulting in a N x K dense matrix. As this Quora answer suggests: http://qr.ae/v03lY rather than the row-wise dot product, which would be O(N^2), it's better to compute the sum of the column outer products, which is O(N x K^2). However, given the number of non-zero elements in the resulting matrix, it seems I could not afford to first perform the full multiplication (N x N) and then prune it afterward (N x K).. So I need a way to prune it on the fly. The original algorithm I came up with is roughly this, for an input matrix M: for each row i: __outer_i = [0] * N __for j in nonzero elements of row i: for k in nonzero elements of col j: __outer_i[k] += M[i][j] * M[k][j] __nearest_i = {sort outer_i and keep best K} which can be parallelized in an embarrassing way, i.e. each compute node can simply process a slice of the the rows. Would there be a way to do something similar (or related) with Spark? Christian
Re: Spark Streaming RDD to Shark table
OK...I needed to set the JVM class.path for the worker to find the fb class: env.put(SPARK_JAVA_OPTS, -Djava.class.path=/home/myInc/hive-0.9.0-bin/lib/libfb303.jar); Now I am seeing the following spark.httpBroadcast.uri error. What am I missing? java.util.NoSuchElementException: spark.httpBroadcast.uri at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:151) at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:151) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.spark.SparkConf.get(SparkConf.scala:151) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:104) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcast.scala:70) at org.apache.spark.broadcast.BroadcastManager.initialize(Broadcast.scala:81) at org.apache.spark.broadcast.BroadcastManager.init(Broadcast.scala:68) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:175) at org.apache.spark.executor.Executor.init(Executor.scala:110) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:56) . . . 14/05/27 15:26:45 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sp...@clim2-dsv.myinc.ad.myinccorp.com:3694/user/CoarseGrainedScheduler 14/05/27 15:26:46 ERROR CoarseGrainedExecutorBackend: Slave registration failed: Duplicate executor ID: 8 === Full Stack: === Spark Executor Command: /usr/lib/jvm/java-7-openjdk-i386/bin/java -cp :/home/myInc/spark-0.9.1-bin-hadoop1/conf:/home/myInc/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar -Djava.library.path=/home/myInc/hive-0.9.0-bin/lib/libfb303.jar -Djava.library.path=/home/myInc/hive-0.9.0-bin/lib/libfb303.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sp...@clim2-dsv.myinc.ad.myinccorp.com:3694/user/CoarseGrainedScheduler 8 tahiti-ins.myInc.ad.myInccorp.com 1 akka.tcp://sparkwor...@tahiti-ins.myinc.ad.myinccorp.com:37841/user/Worker app-20140527152556-0029 log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/05/27 15:26:44 INFO CoarseGrainedExecutorBackend: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/05/27 15:26:44 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkwor...@tahiti-ins.myinc.ad.myinccorp.com:37841/user/Worker 14/05/27 15:26:44 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sp...@clim2-dsv.myinc.ad.myinccorp.com:3694/user/CoarseGrainedScheduler 14/05/27 15:26:45 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkwor...@tahiti-ins.myinc.ad.myinccorp.com:37841/user/Worker 14/05/27 15:26:45 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/05/27 15:26:45 INFO Slf4jLogger: Slf4jLogger started 14/05/27 15:26:45 INFO Remoting: Starting remoting 14/05/27 15:26:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@tahiti-ins.myinc.ad.myinccorp.com:43488] 14/05/27 15:26:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@tahiti-ins.myinc.ad.myinccorp.com:43488] 14/05/27 15:26:45 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://sp...@clim2-dsv.myinc.ad.myinccorp.com:3694/user/BlockManagerMaster 14/05/27 15:26:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140527152645-b13b 14/05/27 15:26:45 INFO MemoryStore: MemoryStore started with capacity 297.0 MB. 14/05/27 15:26:45 INFO ConnectionManager: Bound socket to port 55853 with id = ConnectionManagerId(tahiti-ins.myInc.ad.myInccorp.com,55853) 14/05/27 15:26:45 INFO BlockManagerMaster: Trying to register BlockManager 14/05/27 15:26:45 INFO BlockManagerMaster: Registered BlockManager 14/05/27 15:26:45 ERROR OneForOneStrategy: spark.httpBroadcast.uri java.util.NoSuchElementException: spark.httpBroadcast.uri at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:151) at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:151) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.spark.SparkConf.get(SparkConf.scala:151) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:104) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcast.scala:70) at org.apache.spark.broadcast.BroadcastManager.initialize(Broadcast.scala:81) at
Re: Re: spark table to hive table
On Tue, May 27, 2014 at 6:08 PM, JaeBoo Jung itsjb.j...@samsung.com wrote: I already tried HiveContext as well as SqlContext. But it seems that Spark's HiveContext is not completely same as Apache Hive. For example, SQL like 'SELECT RANK() OVER(ORDER BY VAL1 ASC) FROM TEST LIMIT 10' works fine in Apache Hive, Spark SQL doesn't support window functions yet (SPARK-1442https://issues.apache.org/jira/browse/SPARK-1442). Sorry for the non-obvious error message!
A Standalone App in Scala: Standalone mode issues
During the last few days I've been trying to deploy a Scala job to a standalone cluster (master + 4 workers) without much success, although it worked perfectly when launching it from the spark shell, that is, using the Scala REPL (pretty strange, this would mean my cluster config was actually correct). In order to test it with a simpler example, I decided to deploy this example https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala in standalone mode(master + 1 worker, same machine). Please have a look at this gist https://gist.github.com/JordiAranda/4ee54f84dc92f02ecb8c for the cluster setup. I can't get rid of the EOFException. So, I should definitely be missing something. Why it works when setting the master config property to local[x] or launching it from the REPL, and not when setting the master config property as an spark url? PS: Please, notice I am using the latest release (0.9.1) prebuilt for Hadoop 2 Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/A-Standalone-App-in-Scala-Standalone-mode-issues-tp6493.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: K-NN by efficient sparse matrix product
Thank you for your answer. Would you have by any chance some example code (even fragmentary) that I could study? On 28 May 2014 14:04, Tom Vacek minnesota...@gmail.com wrote: Maybe I should add: if you can hold the entire matrix in memory, then this is embarrassingly parallel. If not, then the complications arise. On Wed, May 28, 2014 at 1:00 PM, Tom Vacek minnesota...@gmail.com wrote: The problem with matrix multiplication is that the amount of data blows up between the mapper and the reducer, and the shuffle operation is very slow. I have not ever tried this, but the shuffle can be avoided by making use of the broadcast. Say we have M = L*R. We do a column decomposition on R, and we collect rows of L to the master and broadcast them (in manageably-sized blocks). Each worker does a dot product and discards the row block when finished. In theory, this has complexity max(nnz(L)*log p, nnz(L)*n/p). I have to warn though: when I played with matrix multiplication, I was getting nowhere near serial performance. On Wed, May 28, 2014 at 11:00 AM, Christian Jauvin cjau...@gmail.com wrote: Hi, I'm new to Spark and Hadoop, and I'd like to know if the following problem is solvable in terms of Spark's primitives. To compute the K-nearest neighbours of a N-dimensional dataset, I can multiply my very large normalized sparse matrix by its transpose. As this yields all pairwise distance values (N x N), I can then sort each row and only keep the K highest elements for each, resulting in a N x K dense matrix. As this Quora answer suggests: http://qr.ae/v03lY rather than the row-wise dot product, which would be O(N^2), it's better to compute the sum of the column outer products, which is O(N x K^2). However, given the number of non-zero elements in the resulting matrix, it seems I could not afford to first perform the full multiplication (N x N) and then prune it afterward (N x K).. So I need a way to prune it on the fly. The original algorithm I came up with is roughly this, for an input matrix M: for each row i: __outer_i = [0] * N __for j in nonzero elements of row i: for k in nonzero elements of col j: __outer_i[k] += M[i][j] * M[k][j] __nearest_i = {sort outer_i and keep best K} which can be parallelized in an embarrassing way, i.e. each compute node can simply process a slice of the the rows. Would there be a way to do something similar (or related) with Spark? Christian
Re: Spark 1.0: slf4j version conflicts with pig
Remark, just including the jar built by sbt will produce the same error. i,.e this pig script will fail: REGISTER /usr/share/osi1/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop0.20.2-cdh3u4.jar; edgeList0 = LOAD '/user/rfcompton/twitter-mention-networks/bidirectional-network-current/part-r-1' USING PigStorage() AS (id1:long, id2:long, weight:int); ttt = LIMIT edgeList0 10; DUMP ttt; On Wed, May 28, 2014 at 12:55 PM, Ryan Compton compton.r...@gmail.com wrote: It appears to be Spark 1.0 related. I made a pom.xml with a single dependency on Spark, registering the resulting jar created the error. Spark 1.0 was compiled via $ SPARK_HADOOP_VERSION=0.20.2-cdh3u4 sbt/sbt assembly The pom.xml, as well as some other information, is below. The only thing that should not be standard is the inclusion of my in-house repository (it's where I host the spark jar I compiled above). project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdcom.mycompany.app/groupId artifactIdmy-app/artifactId version1.0-SNAPSHOT/version packagingjar/packaging namemy-app/name urlhttp://maven.apache.org/url properties maven.compiler.source1.6/maven.compiler.source maven.compiler.target1.6/maven.compiler.target encodingUTF-8/encoding scala.version2.10.4/scala.version /properties build pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId version3.1.5/version /plugin plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version2.0.2/version /plugin /plugins /pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId executions execution idscala-compile-first/id phaseprocess-resources/phase goals goaladd-source/goal goalcompile/goal /goals /execution execution idscala-test-compile/id phaseprocess-test-resources/phase goals goaltestCompile/goal /goals /execution /executions /plugin !-- Plugin to create a single jar that includes all dependencies -- plugin artifactIdmaven-assembly-plugin/artifactId version2.4/version configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration executions execution idmake-assembly/id phasepackage/phase goals goalsingle/goal /goals /execution /executions /plugin /plugins /build repositories !-- needed for cdh build of Spark -- repository idreleases/id url10.10.1.29:8081/nexus/content/repositories/releases/url /repository repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/url /repository /repositories dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency !--on node29-- dependency groupIdorg.apache.spark/groupId artifactIdspark-assembly/artifactId version1.0.0-cdh3u4/version classifiercdh3u4/classifier /dependency !--spark docs says I need hadoop-client, cdh3u3 repo no longer exists-- dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version0.20.2-cdh3u4/version /dependency /dependencies /project Here's what I get in the dependency tree: [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ my-app --- [INFO]
Re: Invalid Class Exception
On 5/27/2014 1:28 PM, Marcelo Vanzin wrote: On Tue, May 27, 2014 at 1:05 PM, Suman Somasundar suman.somasun...@oracle.com wrote: I am running this on a Solaris machine with logical partitions. All the partitions (workers) access the same Spark folder. Can you check whether you have multiple versions of the offending class (org.apache.spark.SerializableWritable) in the classpath of your apps? Maybe you do and different nodes are loading jars in different I checked all the org.apache.spark.SerializableWritable classes and all of them have the same serialVersionUID On 5/23/2014 9:44 PM, Andrew Or wrote: That means not all of your driver and executors have the same version of Spark. Are you on a standalone EC2 cluster? If so, one way to fix this is to run the following on the master node: /root/spark-ec2/copy-dir --delete /root/spark This syncs all of Spark across your cluster, configs, jars and everything. 2014-05-23 15:20 GMT-07:00 Suman Somasundar suman.somasun...@oracle.com: Hi, I get the following exception when using Spark to run various programs. java.io.InvalidClassException: org.apache.spark.SerializableWritable; local class incompatible: stream classdesc serialVersionUID = 6301214776158303468, local class serialVersionUID = -7785455416944904980 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
Re: Spark 1.0: slf4j version conflicts with pig
posted a JIRA https://issues.apache.org/jira/browse/SPARK-1952 On Wed, May 28, 2014 at 1:14 PM, Ryan Compton compton.r...@gmail.com wrote: Remark, just including the jar built by sbt will produce the same error. i,.e this pig script will fail: REGISTER /usr/share/osi1/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop0.20.2-cdh3u4.jar; edgeList0 = LOAD '/user/rfcompton/twitter-mention-networks/bidirectional-network-current/part-r-1' USING PigStorage() AS (id1:long, id2:long, weight:int); ttt = LIMIT edgeList0 10; DUMP ttt; On Wed, May 28, 2014 at 12:55 PM, Ryan Compton compton.r...@gmail.com wrote: It appears to be Spark 1.0 related. I made a pom.xml with a single dependency on Spark, registering the resulting jar created the error. Spark 1.0 was compiled via $ SPARK_HADOOP_VERSION=0.20.2-cdh3u4 sbt/sbt assembly The pom.xml, as well as some other information, is below. The only thing that should not be standard is the inclusion of my in-house repository (it's where I host the spark jar I compiled above). project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdcom.mycompany.app/groupId artifactIdmy-app/artifactId version1.0-SNAPSHOT/version packagingjar/packaging namemy-app/name urlhttp://maven.apache.org/url properties maven.compiler.source1.6/maven.compiler.source maven.compiler.target1.6/maven.compiler.target encodingUTF-8/encoding scala.version2.10.4/scala.version /properties build pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId version3.1.5/version /plugin plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version2.0.2/version /plugin /plugins /pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId executions execution idscala-compile-first/id phaseprocess-resources/phase goals goaladd-source/goal goalcompile/goal /goals /execution execution idscala-test-compile/id phaseprocess-test-resources/phase goals goaltestCompile/goal /goals /execution /executions /plugin !-- Plugin to create a single jar that includes all dependencies -- plugin artifactIdmaven-assembly-plugin/artifactId version2.4/version configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration executions execution idmake-assembly/id phasepackage/phase goals goalsingle/goal /goals /execution /executions /plugin /plugins /build repositories !-- needed for cdh build of Spark -- repository idreleases/id url10.10.1.29:8081/nexus/content/repositories/releases/url /repository repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/url /repository /repositories dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency !--on node29-- dependency groupIdorg.apache.spark/groupId artifactIdspark-assembly/artifactId version1.0.0-cdh3u4/version classifiercdh3u4/classifier /dependency !--spark docs says I need hadoop-client, cdh3u3 repo no longer exists-- dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version0.20.2-cdh3u4/version /dependency /dependencies /project Here's what
Re: Spark Memory Bounds
Thanks! Sounds like my rough understanding was roughly right :) Definitely understand cached RDDs can add to the memory requirements. Luckily, like you mentioned, you can configure spark to flush that to disk and bound its total size in memory via spark.storage.memoryFraction, so I have a pretty good handle on the overall RDD contribution. Thanks for all the help. Keith On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen c...@adatao.com wrote: Keith, please see inline. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 7:22 PM, Keith Simmons ke...@pulse.io wrote: A dash of both. I want to know enough that I can reason about, rather than strictly control, the amount of memory Spark will use. If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources. Or alternatively, if it's even possible for Spark to process a data set over a certain size. And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption. In practice, to avoid OOME, a key dial we use is the size (or inversely, number) of the partitions of your dataset. Clearly there is some blow-up factor F such that, e.g., if you start out with 128MB on-disk data partitions, you would consume 128F MB of memory, both by Spark and by your closure code. Knowing this, you would want to size the partitions such that AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker 128F. To arrive at F, you could do some back-of-the-envelope modeling, and/or run the job and observe empirically. It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures. So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks. Is my understanding correct? Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started. Yes, that is correct in non-cached mode. At the same time, Spark also does something else optionally, which is to keep the data structures (RDDs) persistent in memory (*). As such it is possible partitions that are not being actively worked on to be consuming memory. Spark will spill all these to local disk if they take up more memory than it is allowed to take. So the key thing to worry about is less about what Spark does (apart of overhead and yes, the possibility of bugs that need to be fixed), and more about what your closure code does with JVM memory as a whole. If in doubt, refer back to the blow-up factor model described above. (*) this is a fundamentally differentiating feature of Spark over a range of other in-memory architectures, that focus on raw-data or transient caches that serve non-equivalent purposes when viewed from the application level. It allows for very fast access to ready-to-consume high-level data structures, as long as available RAM permits. On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.comwrote: Keith, do you mean bound as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task? If a, then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark voluntarily tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark. If b, then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote: I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm
Re: GraphX partition problem
I've been trying to reproduce this but I haven't succeeded so far. For example, on the web-Google https://snap.stanford.edu/data/web-Google.htmlgraph, I get the expected results both on v0.9.1-handle-empty-partitions and on master: // Load web-Google and run connected componentsimport org.apache.spark.graphx._val g = GraphLoader.edgeListFile(sc, /Users/ankurdave/Downloads/web-Google.txt, minEdgePartitions=8) g.vertices.count // = 875713val cc = g.connectedComponents.vertices.map(_._2).cache() cc.count // = 875713val counts = cc.countByValue counts.values.sum // = 875713// There should not be any single-vertex components, because we loaded an edge listcounts.count(_._2 == 0) // = 0counts.count(_._2 == 1) // = 0counts.count(_._2 == 2) // = 783counts.count(_._2 == 3) // = 503// The 3 smallest and largest components in the graph (with nondeterministic tiebreaking)counts.toArray.sortBy(_._2).take(3) // = Array((418467,2), (272504,2), (719750,2))counts.toArray.sortBy(_._2).takeRight(3) // = Array((1363,384), (1734,404), (0,855802)) Ankur http://www.ankurdave.com/
Re: Python, Spark and HBase
Hi Nick, I finally got around to downloading and building the patch. I pulled the code from https://github.com/MLnick/spark-1/tree/pyspark-inputformats I am running on a CDH5 node. While the code in the CDH branch is different from spark master, I do believe that I have resolved any inconsistencies. When attempting to connect to an HBase table using SparkContext.newAPIHadoopFile I receive the following error: Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM I have searched the pyspark-inputformats branch and cannot find any reference to the class org.apache.spark.api.python.PythonRDDnewAPIHadoopFile Any ideas? Also, do you have a working example of HBase access with the new code? Thanks Tommer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6502.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python, Spark and HBase
It sounds like you made a typo in the code — perhaps you’re trying to call self._jvm.PythonRDDnewAPIHadoopFile instead of self._jvm.PythonRDD.newAPIHadoopFile? There should be a dot before the new. Matei On May 28, 2014, at 5:25 PM, twizansk twiza...@gmail.com wrote: Hi Nick, I finally got around to downloading and building the patch. I pulled the code from https://github.com/MLnick/spark-1/tree/pyspark-inputformats I am running on a CDH5 node. While the code in the CDH branch is different from spark master, I do believe that I have resolved any inconsistencies. When attempting to connect to an HBase table using SparkContext.newAPIHadoopFile I receive the following error: Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM I have searched the pyspark-inputformats branch and cannot find any reference to the class org.apache.spark.api.python.PythonRDDnewAPIHadoopFile Any ideas? Also, do you have a working example of HBase access with the new code? Thanks Tommer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6502.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Checking spark cache percentage programatically. And how to clear cache.
You can remove cached RDDs by calling unpersist() on them. You can also use SparkContext.getRDDStorageInfo to get info on cache usage, though this is a developer API so it may change in future versions. We will add a standard API eventually but this is just very closely tied to framework internals. Matei On May 28, 2014, at 5:32 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hi, Is there a programmatic way of checking whether RDD has been 100% cached or not? I'd like to do this to have two different path ways. Additionally, how do you clear cache (e.g. if you want to cache different RDDs, and you'd like to clear an existing cached RDD). Thanks!
Re: Python, Spark and HBase
In my code I am not referencing PythonRDD or PythonRDDnewAPIHadoopFile at all. I am calling SparkContext.newAPIHadoopFile with: inputformat_class='org.apache.hadoop.hbase.mapreduce.TableInputFormat' key_class='org.apache.hadoop.hbase.io.ImmutableBytesWritable', value_class='org.apache.hadoop.hbase.client.Result' Is it possible that the typo is coming from inside the spark code? Tommer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6506.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python, Spark and HBase
The code which causes the error is: The code which causes the error is: sc = SparkContext(local, My App) rdd = sc.newAPIHadoopFile( name, 'org.apache.hadoop.hbase.mapreduce.TableInputFormat', 'org.apache.hadoop.hbase.io.ImmutableBytesWritable', 'org.apache.hadoop.hbase.client.Result', conf={hbase.zookeeper.quorum: my-host, hbase.rootdir: hdfs://my-host:8020/hbase, hbase.mapreduce.inputtable: data}) The full stack trace is: Py4JError Traceback (most recent call last) ipython-input-8-3b9a4ea2f659 in module() 7 conf={hbase.zookeeper.quorum: my-host, 8 hbase.rootdir: hdfs://my-host:8020/hbase, 9 hbase.mapreduce.inputtable: data}) 10 11 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.pyc in newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, conf) 281 for k, v in conf.iteritems(): 282 jconf[k] = v -- 283 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, 284 key_wrapper, value_wrapper, jconf) 285 return RDD(jrdd, self, PickleSerializer()) /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __getattr__(self, name) 657 else: 658 raise Py4JError('{0} does not exist in the JVM'. -- 659 format(self._fqn + name)) 660 661 def __call__(self, *args): Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6507.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Stand-alone mode job not starting (akka Connection refused)
I've been trying for several days now to get a Spark application running in stand-alone mode, as described here: http://spark.apache.org/docs/latest/spark-standalone.html I'm using pyspark, so I've been following the example here: http://spark.apache.org/docs/0.9.1/quick-start.html#a-standalone-app-in-python I've run Spark successfully in local mode using bin/pyspark, or even just setting the SPARK_HOME environment variable, proper PYTHONPATH, and then starting up python 2.7, importing pyspark, and creating a SparkContext object. It's running in any kind of cluster mode that seems to be the problem. The StandAlone.py program in the example just reads a file and counts lines. My SparkConf looks like this: from pyspark import SparkConf, SparkContext conf = SparkConf() #conf.setMaster(spark://192.168.0.9:7077) conf.setMaster(spark://myhostname.domain.com:7077) conf.setAppName(My application) conf.set(spark.executor.memory, 1g) I tried a couple of configurations: Config 1: (All on one) - master is localhost, slave is localhost Config 2 (Separate master and slave) - master is localhost, slave is another host I've tried a few different machines: Machine 1: Mac OS 10.9 w/ CDH5 Hadoop distribution, compiled with SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0 option Machines 2, 3: Centos 6.4 w/ CDH5 Hadoop distribution, compiled with SPARK_HADOOP_VERSION=2.3.0-cdh5.0.0 option Machine 4: Centos 6.4 with Hadoop 1.04 (default Spark compilation) Here are the results I've had: Config 1 on Machine 1: Success Config 1 on Machine 2: Fail Config 2 on Machines 2,3: Fail Config 1 on Machines 4: Fail Config 2 on Machines 1,4: Fail In the case of failure, the error is always the same. akka.tcp://sp...@node4.myhostname.domain.com:43717 got disassociated, removing it. akka.tcp://sp...@node4.myhostname.domain.com:43717 got disassociated, removing it. Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.0.2.55%3A42546-2#-1875068764] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. AssociationError [akka.tcp://sparkMaster@node4:7077] - [akka.tcp:// sp...@node4.myhostname.domain.com:43717]: Error [Association failed with [akka.tcp://sp...@node4.myhostname.domain.com:43717]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sp...@node4.myhostname.domain.com:43717] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node4.myhostname.domain.com/10.0.2.55:43717 It will then repeat this line: parentName: , name: TaskSet_0, runningTasks: 0 for a while, and then print out this message: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I have turned the verbosity to DEBUG on all log4j.properties I can find. There are no firewalls or blocked ports on the internal network. In all configurations on all machines, when I do sbin/start-master.sh, sbin/start-slaves.sh, the respective log files always show the correct info (I have been elected leader! New state: ALIVE or Successfully registered with master spark://blah-blah:7077). The very nice UIs (on port 8080 for the masters, port 8081 for the slaves) always show that everything is in order. The master host shows the workers, the workers acknowledge they have registered with the master. When attempting to get 'Config 1' running on any of the machines, I've put both 'localhost' and the actual fully qualified domain name of the host in conf/slaves. Results are the same. In the one case where things are working, I see messages like this in the log: Remoting started; listening on addresses :[akka.tcp:// sparkExecutor@192.168.0.9:59049] Remoting now listens on addresses: [akka.tcp:// sparkExecutor@192.168.0.9:59049] Connecting to driver: akka.tcp:// spark@192.168.0.9:59032/user/CoarseGrainedScheduler Connecting to worker akka.tcp://sparkWorker@192.168.0.9:59005/user/Worker Successfully connected to akka.tcp:// sparkWorker@192.168.0.9:59005/user/Worker Successfully registered with driver I've tried many different variables in my spark-env.sh. Currently, in the one case that works, I set: STANDALONE_SPARK_MASTER_HOST=`hostname -f` but that's about it (setting that in the failure cases does not make them work). So to me, it seems like the messages from Akka are not getting to the workers. Any idea why this is? Thanks for the help! -T.J.
Re: Spark on an HPC setup
Hi Sid, We are successfully running Spark on an HPC, it works great. Here's info on our setup / approach. We have a cluster with 256 nodes running Scientific Linux 6.3 and scheduled by Univa Grid Engine. The environment also has a DDN GridScalar running GPFS and several EMC Isilon clusters serving NFS to the compute cluster. We wrote a custom qsub job to spin up Spark dynamically on a user-designated quantity of nodes. The UGE scheduler first designates a set of nodes that will be used to run Spark. Once the nodes are available, we use start-master.sh script to launch a master, and send it the addresses of the other nodes. The master then starts the workers with start-all.sh. At that point, the Spark cluster is usable and remains active until the user issues a qdel, which triggers the stop-all.sh on the master, and takes down the cluster. This worked well for us because users can pick the number of nodes to suit their job, and multiple users can run their own Spark clusters on the same system (alongside other non-Spark jobs). We don't use HDFS for the filesystem, instead relying on NFS and GPFS, and the cluster is not running Hadoop. In tests, we've seen similar performance between our set up, and using Spark w/ HDFS on EC2 with higher-end instances (matched roughly for memory and number of cores). Unfortunately we can't open source the launched scripts because they contain proprietary UGE stuff, but happy to try and answer any follow-up questions. -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab On May 28, 2014, at 11:02 AM, Sidharth Kashyap sidharth.n.kash...@outlook.com wrote: Hi, Has anyone tried to get Spark working on an HPC setup? If yes, can you please share your learnings and how you went about doing it? An HPC setup typically comes bundled with dynamically allocated cluster and a very efficient scheduler. Configuring Spark standalone in this mode of operation is challenging as the Hadoop dependencies need to be eliminated and the cluster needs to be configured on the fly. Thanks, Sid
Re: Integration issue between Apache Shark-0.9.1 (with in-house hive-0.11) and pre-existing CDH4.6 HIVE-0.10 server
Hi, My shark-env.sh is already pointing to the hadoop2 cluster: export HADOOP_HOME=/opt/cloudera/parcels/CDH-4.6.0-1.cdh4.6.0.p0.26/lib/hadoop Both the hadoop cluster as well as the embedded hadoop jars within Shark are of version 2.0.0. Any more suggestions please? Thanks On Wed, May 28, 2014 at 11:00 PM, Andrew Ash and...@andrewash.com wrote: IPC version 7 vs 4 is Hadoop2 vs Hadoop1. I'm guessing your Hadoop cluster is on a different version than the .jars you're using in Shark. http://stackoverflow.com/questions/16491547/pig-to-hadoop-issue-server-ipc-version-7-cannot-communicate-with-client-version Can you try finding matching jars for your Hadoop cluster? On Wed, May 28, 2014 at 8:47 AM, bijoy deb bijoy.comput...@gmail.comwrote: Hi all, I have installed Apache Shark 0.9.1 on my machine which comes bundled with hive-0.11 version of hive jars.I am trying to integrate this with my pre-existing CDH-4.6 version of the Hive server which is of version 0.10.On pointing HIVE_HOME in spark-env.sh to the cloudera version of the hive (hive-0.10),I am getting the IPC version mismatch error as below when I try to execute any SELECT query on an existing Hive table: shark select * from test_table; 27.828: [Full GC 61270K-21300K(1013632K), 0.1185540 secs] java.lang.RuntimeException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential( TaskRunner.java:57) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362) It seems the Shark-0.9.1 version is incompatible with Hive-0.10 server.Please suggest a solution if anyone has encountered this issue. Thanks Bijoy