Re: Memory/Network Intensive Workload
Hi Not sure, if this will help you. 1. Create one application that will put files to your S3 bucket from public data source (You can use public wiki-data) 2. Create another application (SparkStreaming one) which will listen on that bucket ^^ and perform some operation (Caching, GroupBy etc) as soon as the data kicks in. In this way you are able to utilize all network and memory. Thanks Best Regards On Mon, Jun 30, 2014 at 12:25 AM, danilopds danilob...@gmail.com wrote: Hello, I'm studying the Spark platform and I'd like to realize experiments in your extension Spark Streaming. So, I guess that an intensive memory and network workload are a good options. Can anyone suggest a few typical Spark Streaming workloads that are network/memory intensive? If someone have other suggestions for good workloads upon Spark Streaming will be interesting too. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-Network-Intensive-Workload-tp8501.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with HBase
Something like this??? import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import com.google.common.collect.Lists; import scala.Function1; import scala.collection.JavaConverters.*; import scala.reflect.ClassTag; public class SparkHBaseMain { public static void main(String[] arg){ try{ ListString jars = Lists.newArrayList(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar, /home/akhld/Downloads/hbase-server-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-protocol-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-hadoop2-compat-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-common-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-client-0.96.0-hadoop2.jar, /home/akhld/Downloads/htrace-core-2.02.jar); SparkConf spconf = new SparkConf(); spconf.setMaster(local); spconf.setAppName(HBaser); spconf.setSparkHome(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2); spconf.setJars(jars.toArray(new String[jars.size()])); spconf.set(spark.executor.memory, 1g); JavaStreamingContext jsc = new JavaStreamingContext(spconf,new Duration(1)); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.addResource(/home/akhld/mobi/temp/sung/hbase-site.xml); conf.set(TableInputFormat.INPUT_TABLE, blogposts); NewHadoopRDDImmutableBytesWritable, Result rdd2 = new NewHadoopRDDImmutableBytesWritable, Result (jsc.ssc().sc(), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf); System.out.println(rdd2.count()); jsc.start(); }catch(Exception e){ e.printStackTrace(); System.out.println(Crshed : + e); } } } Thanks Best Regards On Sun, Jun 29, 2014 at 10:16 PM, N.Venkata Naga Ravi nvn_r...@hotmail.com wrote: Hi, Is there any example provided for Spark Streaming with Input provided from HBase table content. Thanks, Ravi
How to control a spark application(executor) using memory amount per node?
Hi, When i send the following statements in spark-shell: val file = sc.textFile(hdfs://nameservice1/user/study/spark/data/soc-LiveJournal1.txt) val count = file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_+_) println(count.count()) and, it throw a exception: .. 14/06/30 15:50:53 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2346) at java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2275) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:176) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) then, I set the following configuration in spark-env.sh export SPARK_EXECUTOR_MEMORY=1G It's not OK. spark.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n8521/spark.png I found when i start spark-shell, then console also print the logs: SparkDeploySchedulerBackend: Granted executor ID app-20140630144110-0002/0 on hostPort dlx8:7078 with 8 cores, *512.0 MB RAM* How to increate 512.0 MB RAM to the more memory? Pls! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-a-spark-application-executor-using-memory-amount-per-node-tp8521.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark RDD member of class loses it's value when the class being used as graph attribute
Can you share some example code of what you are doing? BTW Gmail puts down your mail as spam, saying it cannot verify it came from yahoo.com. Might want to check your mail client settings. (It could be a Gmail or Yahoo bug too of course.) On Fri, Jun 27, 2014 at 4:29 PM, harsh2005_7 harsh200...@yahoo.com wrote: Hi, I have a scenario where I am having a class X with constructor parameter as (RDD,Double).When I am initializing the the class object with corresponding RDD and double value (of name say x1) and *putting it as a vertex attribute in graph* , I am losing my RDD value . The Double value remains intact . I tried accessing simultaneously the RDD from instance variable (x1) and i see it intact there but for some reason it's not available when i take graph vertex attribute and access the RDD. Please help me to understand which concept I am missing here ? And whats the correct way to do it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark RDD member of class loses it's value when the class being used as graph attribute
The code base is huge but sharing the snapshot of it which I think might give you some idea . Here is my class Player which is supposed to be my vertex attribute : *class Player(var RvalRdd: RDD[((Int, Int), Double)], Slope_m: Double) extends Serializable { //Some code here }* As you can see this takes a constructor parameter one of which is an RDD. Here is my main execution class where I create vertex with attributes of players object RLSparkExceution2 { // Some code here val sc = new SparkContext(local, Multiplayer) * val p1 = new Player(initRvalRdd(sc), m1) //Instantiates my RDD field* val p2 = new Player(initRvalRdd(sc), m2) val dummy_player = new Player(sc.parallelize(List(((0, 0), 0.0))), 0) val dummy_player_msg = new Player(sc.parallelize(List(((0, 0), 0.0))), 0) val players: RDD[(VertexId, Player)] = sc.parallelize(Array((1L, p1), (2L, p2), (0L, dummy_player))) val connections: RDD[Edge[String]] = sc.parallelize(Array(Edge(1L, 0L, dummyConnect), Edge(2L, 0L, dummyConnect))) var graph = Graph(players, connections) def main(args: Array[String]): Unit = { *println(p1.RvalRdd.count) //Here this gets printed* //*This one throws a nullpointer exception* graph.vertices.foreach(vtx =if(vtx._1 !=0L) println(stest to see rval in beginnig ${vtx._1} :: ${*vtx._2.RvalRdd.count*})) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420p8523.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Configuration properties for Spark
Hi: Is there a comprehensive properties list (with permissible/default values) for spark ? Thanks Mans
Callbacks on freeing up of RDDs
Hi all, I am trying to create a custom RDD class for result set of queries supported in InMobi Grill (http://inmobi.github.io/grill/) Each result set has a schema (similar to Hive's TableSchema) and a path in HDFS containing the result set data. An easy way of doing this would be to create a temp table in Hive, and use HCatInputFormat to create an RDD using the newAPIHadoopRDD call. I've already done this and it works. However, I also want to *delete* the temp table when the RDD is unpersisted, or when the SparkContext is gone. How could I do that in Spark? Does Spark allow users to register code to be executed when an RDD is freed? Something like the OutputCommitter in Hadoop? Thanks, Jaideep -- _ The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you have received this communication in error, please notify us immediately by responding to this email and then delete it from your system. The firm is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0
Hi all, I reinstalled spark,reboot the system,but still I am not able to start the workers.Its throwing the following exception: Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/192.168.125.174:0 I doubt the problem is with 192.168.125.174:0. Eventhough the command contains master:7077,why its showing 0 in the log. java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://master:7077 Can somebody tell me a solution. Thanks Regards, Meethu M On Friday, 27 June 2014 4:28 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, ya I tried setting another PORT also,but the same problem.. master is set in etc/hosts Thanks Regards, Meethu M On Friday, 27 June 2014 3:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: tha's strange, did you try setting the master port to something else (use SPARK_MASTER_PORT). Also you said you are able to start it from the java commandline java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://:master:7077 What is the master ip specified here? is it like you have entry for master in the /etc/hosts? Thanks Best Regards On Fri, Jun 27, 2014 at 3:09 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, I am running it in a LAN itself..The IP of the master is given correctly. Thanks Regards, Meethu M On Friday, 27 June 2014 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: why is it binding to port 0? 192.168.125.174:0 :/ Check the ip address of that master machine (ifconfig) looks like the ip address has been changed (hoping you are running this machines on a LAN) Thanks Best Regards On Fri, Jun 27, 2014 at 12:00 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, My Spark(Standalone mode) was running fine till yesterday.But now I am getting the following exeception when I am running start-slaves.sh or start-all.sh slave3: failed to launch org.apache.spark.deploy.worker.Worker: slave3: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) slave3: at java.lang.Thread.run(Thread.java:662) The log files has the following lines. 14/06/27 11:06:30 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/27 11:06:30 INFO SecurityManager: Changing view acls to: hduser 14/06/27 11:06:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser) 14/06/27 11:06:30 INFO Slf4jLogger: Slf4jLogger started 14/06/27 11:06:30 INFO Remoting: Starting remoting Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/192.168.125.174:0 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ... Caused by: java.net.BindException: Cannot assign requested address ... I saw the same error reported before and have tried the following solutions. Set the variable SPARK_LOCAL_IP ,Changed the SPARK_MASTER_PORT to a different number..But nothing is working. When I try to start the worker from the respective machines using the following java command,its running without any exception java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://:master:7077 Somebody please give a solution Thanks Regards, Meethu M
Re: How to control a spark application(executor) using memory amount per node?
Hi, Try setting driver-java-options with spark-submit or set spark.executor.extraJavaOptions in spark-default.conf Thanks Regards, Meethu M On Monday, 30 June 2014 1:28 PM, hansen han...@neusoft.com wrote: Hi, When i send the following statements in spark-shell: val file = sc.textFile(hdfs://nameservice1/user/study/spark/data/soc-LiveJournal1.txt) val count = file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_+_) println(count.count()) and, it throw a exception: .. 14/06/30 15:50:53 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2346) at java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2275) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:176) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) then, I set the following configuration in spark-env.sh export SPARK_EXECUTOR_MEMORY=1G It's not OK. spark.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n8521/spark.png I found when i start spark-shell, then console also print the logs: SparkDeploySchedulerBackend: Granted executor ID app-20140630144110-0002/0 on hostPort dlx8:7078 with 8 cores, *512.0 MB RAM* How to increate 512.0 MB RAM to the more memory? Pls! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-a-spark-application-executor-using-memory-amount-per-node-tp8521.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
TaskNotSerializable when invoking KMeans.run
Hello, I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2 and numPartitions = 200. Has anyone seen this error before and know what could be the reason for this? Thanks, Daniel
Serializer or Out-of-Memory issues?
I'm trying to perform operations on a large RDD, that ends up being about 1.3 GB in memory when loaded in. It's being cached in memory during the first operation, but when another task begins that uses the RDD, I'm getting this error that says the RDD was lost: 14/06/30 09:48:17 INFO TaskSetManager: Serialized task 1.0:4 as 8245 bytes in 0 ms 14/06/30 09:48:17 WARN TaskSetManager: Lost TID 15611 (task 1.0:3) 14/06/30 09:48:17 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /Users/me/Desktop/spark-1.0.0/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line 142, in _read_with_length length = read_int(stream) File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/06/30 09:48:18 INFO AppClient$ClientActor: Executor updated: app-20140630090515-/0 is now FAILED (Command exited with code 52) 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor app-20140630090515-/0 removed: Command exited with code 52 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor 0 disconnected, so removing it 14/06/30 09:48:18 ERROR TaskSchedulerImpl: Lost executor 0 on localhost: OutOfMemoryError 14/06/30 09:48:18 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15610 (task 1.0:2) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15609 (task 1.0:1) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15612 (task 1.0:4) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15608 (task 1.0:0) The operation it fails on is a ReduceByKey(), and the RDD before the operation is split into several thousand partitions (I'm doing term weighting that requires a different partition initially for each document), and the system has 6 GB of memory for the executor, so I'm not sure if it's actually a memory error, as is mentioned 5 lines from the end of the error. The serializer error portion is what's really confusing me, and I can't find references to this particular error with Spark anywhere. Does anyone have a clue as to what the actual error might be here, and what a possible solution would be? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serializer-or-Out-of-Memory-issues-tp8533.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark 1.0 docs out of sync?
I'm hoping someone can clear up some confusion for me. When I view the Spark 1.0 docs online (http://spark.apache.org/docs/1.0.0/) they are different than the docs which are packaged with the Spark 1.0.0 download (spark-1.0.0.tgz). In particular, in the online docs, there's a single merged Spark Programming Guide [image: Inline image 1] Whereas in the docs in the download package there are still three separate guides: [image: Inline image 2] Plus there are several other differences: the color scheme is different (orange vs. blue), and there are several content differences. (The first one being on the Overview page, e.g. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH` vs. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy to run locally on one machine --- all you need is to have `java` installed on your system `PATH` Can someone clarify? And more importantly, where can I download the *official* 1.0 docs to build locally? Thanks! Diana
Re: TaskNotSerializable when invoking KMeans.run
Could you post the code snippet and the error stack trace? -Xiangrui On Mon, Jun 30, 2014 at 7:03 AM, Daniel Micol dmi...@gmail.com wrote: Hello, I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2 and numPartitions = 200. Has anyone seen this error before and know what could be the reason for this? Thanks, Daniel
spark streaming counter metrics
I am new to spark streaming and wondering if spark streaming tracks counters (e.g., how many rows in each consumer, how many rows routed to an individual reduce task, etc.) in any form so I can get an idea of how data is skewed? I checked spark job page but don't seem to find any. -- Chen Song
Re: Could not compute split, block not found
Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Help understanding spark.task.maxFailures
Hi community, this one should be an easy one: I have left spark.task.maxFailures to it's default (which should be 4). I see a job that shows the following statistics for Tasks: Succeeded/Total 7109/819 (1 failed) So there were 819 tasks to start with. I have 2 executors in that cluster. From Spark docs it says spark.task.maxFailures is the number of times to try a task before a job is given up. So I was imagining that 819*4 (i.e. 3276) would be the max number to ever see in the succeeded (accounting for retries on every possibly task). even that 3276*2 (6552, if it's per task per executor) does not account for 7109 successfull tasks. Could anyone help explain why I'm seeing such high number of succeeded tasks?
Spark 1.0: Reading JSON LZH Compressed File
Hi, Spark 1.0 has been installed as Standalone - But it can't read any compressed (CMX/Snappy) and Sequence file residing on HDFS. The key notable message is: Unable to load native-hadoop library.. Other related messages are - Caused by: java.lang.IllegalStateException: Cannot load com.ibm.biginsights.compress.CmxDecompressor without native library! at com.ibm.biginsights.compress.CmxDecompressor.clinit(CmxDecompressor.java:65) Here is the core-site.xml's key part: nameio.compression.codecs/name valueorg.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.ibm.biginsights.compress.CmxCodec/value /property Here is the spark.env.sh: export SPARK_WORKER_CORES=4 export SPARK_WORKER_MEMORY=10g export SCALA_HOME=/opt/spark/scala-2.11.1 export JAVA_HOME=/opt/spark/jdk1.7.0_55 export SPARK_HOME=/opt/spark/spark-0.9.1-bin-hadoop2 export ADD_JARS=/opt/IHC/lib/compression.jar export SPARK_CLASSPATH=/opt/IHC/lib/compression.jar export SPARK_LIBRARY_PATH=/opt/IHC/lib/native/Linux-amd64-64/ export SPARK_MASTER_WEBUI_PORT=1080 export HADOOP_CONF_DIR=/opt/IHC/hadoop-conf Note: CMX is an IBM branded splittable LZO based compression codec. Any help is appreciated. Thanks, Nasir DTCC DISCLAIMER: This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify us immediately and delete the email and any attachments from your system. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email.
RE: Serialization of objects
Hi everyone,I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: public class BasicStringWrapper implements Serializable However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: Serialization of objects Date: Thu, 26 Jun 2014 09:30:31 -0700 Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf import org.apache.spark.rdd;import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) }} class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName)i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25
Spark 1.0 and Logistic Regression Python Example
Hi, I modified the example code for logistic regression to compute the error in classification. Please see below. However the code is failing when it makes a call to: labelsAndPreds.filter(lambda (v, p): v != p).count() with the error message (something related to numpy or dot product): File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/classification.py, line 65, in predict margin = _dot(x, self._coeff) + self._intercept File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/_common.py, line 443, in _dot return vec.dot(target) AttributeError: 'numpy.ndarray' object has no attribute 'dot' FYI, I am running the code using spark-submit i.e. ./bin/spark-submit examples/src/main/python/mllib/logistic_regression2.py The code is posted below if it will be useful in any way: from math import exp import sys import time from pyspark import SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(',')] if values[0] == -1: # Convert -1 labels to 0 for MLlib values[0] = 0 return LabeledPoint(values[0], values[1:]) ? sc = SparkContext(appName=PythonLR) # start timing start = time.time() #start = time.clock() data = sc.textFile(sWAMSpark_train.csv) parsedData = data.map(parsePoint) # Build the model model = LogisticRegressionWithSGD.train(parsedData) #load test data testdata = sc.textFile(sWSpark_test.csv) parsedTestData = testdata.map(parsePoint) # Evaluating the model on test data labelsAndPreds = parsedTestData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print(Training Error = + str(trainErr)) end = time.time() print(Time is = + str(end - start))
odd caching behavior or accounting
Hi All, I am resending this message because I suspect the original may have been blocked from the mailing list due to attachments. Note that the mail does appear on the apache archives http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3CCANR-kKeO3mxL1QuX0fnz0DEPkU4FFbXO2W_5CdmtrzYKUfhaBg%40mail.gmail.com%3E but not on nabble, the online archive linked from the Spark website http://apache-spark-user-list.1001560.n3.nabble.com/. The text of the original message appears below; the PDF http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/2 and PNG http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/3 files original attached are now available as linked from the apache archive. best, -Brad -- Forwarded message -- From: Brad Miller bmill...@eecs.berkeley.edu Date: Mon, Jun 30, 2014 at 10:20 AM Subject: odd caching behavior or accounting To: user@spark.apache.org Hi All, I've recently noticed some caching behavior which I did not understand and may or may not have indicated a bug. In short, the web UI seemed to indicate that some blocks were being added to the cache despite already being in cache. As documentation, I have attached two UI screenshots. The PNG captures enough of the screen to demonstrate the problem; the PDF is the printout of the full page. Notice that: -block rdd_21_1001 is in the cache twice, both times on letang.research.intel-research.net; many other blocks also occur twice on a variety of hosts. I've not confirmed that the duplicate block is *always* the same host but it seems to appear that way. -the stated storage level is Memory Deserialized 1x Replicated -the top left states that the cached partitions and total partitions are 4000, but in the table where partitions are enumerated there are 4534. Although not reflected in this screenshot, I believe I have seen this behavior occur even when double caching of blocks causes eviction of blocks from other RDDs. I am running the Spark 1.0.0 release and using pyspark. best, -Brad
Re: Could not compute split, block not found
Bill, let's say the processing time is t' and the window size t. Spark does not *require* t' t. In fact, for *temporary* peaks in your streaming data, I think the way Spark handles it is very nice, in particular since 1) it does not mix up the order in which items arrived in the stream, so items from a later window will always be processed later, and 2) because an increase in data will not be punished with high load and unresponsive systems, but with disk space consumption instead. However, if all of your windows require t' t processing time (and it's not because you are waiting, but because you actually do some computation), then you are in bad luck, because if you start processing the next window while the previous one is still processed, you have less resources for each and processing will take even longer. However, if you are only waiting (e.g., for network I/O), then maybe you can employ some asynchronous solution where your tasks return immediately and deliver their result via a callback later? Tobias On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Re: TaskNotSerializable when invoking KMeans.run
Hi Daniel, I also faced the same issue when using Naive Bayes classifier in MLLib. I was able to solve it by making all fields in the calling object either transient of serializable. Spark will print which class's object it was not able to serialize, in the error message. that can give you a hint. Thanks, Jaideep On Mon, Jun 30, 2014 at 7:33 PM, Daniel Micol dmi...@gmail.com wrote: Hello, I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2 and numPartitions = 200. Has anyone seen this error before and know what could be the reason for this? Thanks, Daniel -- _ The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you have received this communication in error, please notify us immediately by responding to this email and then delete it from your system. The firm is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
Re: History Server renered page not suitable for load balancing
Done :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/History-Server-renered-page-not-suitable-for-load-balancing-tp7447p8550.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: little confused about SPARK_JAVA_OPTS alternatives
Hi Andrew, I'm actually using spark-submit, and I tried using spark.executor.extraJavaOpts to configure tachyon client to connect to Tachyon HA master, however the configuration settings were not picked up. On the other hand when I set the same tachyon configuration parameters through SPARK_JAVA_OPTS or /conf/java_opts it actually worked. IMHO tachyon client classes are loaded into jvm and since they are mostly singletons system properties are not being refreshed. Let me know if you need more info. I have logs from both runs and I can try different settings on my spark cluster (I am running spark on mesos in fine grained mode) Best regards Lukasz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-tp5798p8551.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark job stuck when running on mesos fine grained mode
Hi Prabeesh, I've recently moved to mesos 0.18.2 and spark 1.0, so far no problems in fine grained mode, even for grapx or mllib workflows. If u have specific code snippets I can try it out. Best regards Lukasz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-job-stuck-when-running-on-mesos-fine-grained-mode-tp2326p8552.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0 and Logistic Regression Python Example
You were using an old version of numpy, 1.4? I think this is fixed in the latest master. Try to replace vec.dot(target) by numpy.dot(vec, target), or use the latest master. -Xiangrui On Mon, Jun 30, 2014 at 2:04 PM, Sam Jacobs sam.jac...@us.abb.com wrote: Hi, I modified the example code for logistic regression to compute the error in classification. Please see below. However the code is failing when it makes a call to: labelsAndPreds.filter(lambda (v, p): v != p).count() with the error message (something related to numpy or dot product): File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/classification.py, line 65, in predict margin = _dot(x, self._coeff) + self._intercept File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/_common.py, line 443, in _dot return vec.dot(target) AttributeError: 'numpy.ndarray' object has no attribute 'dot' FYI, I am running the code using spark-submit i.e. ./bin/spark-submit examples/src/main/python/mllib/logistic_regression2.py The code is posted below if it will be useful in any way: from math import exp import sys import time from pyspark import SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(',')] if values[0] == -1: # Convert -1 labels to 0 for MLlib values[0] = 0 return LabeledPoint(values[0], values[1:]) sc = SparkContext(appName=PythonLR) # start timing start = time.time() #start = time.clock() data = sc.textFile(sWAMSpark_train.csv) parsedData = data.map(parsePoint) # Build the model model = LogisticRegressionWithSGD.train(parsedData) #load test data testdata = sc.textFile(sWSpark_test.csv) parsedTestData = testdata.map(parsePoint) # Evaluating the model on test data labelsAndPreds = parsedTestData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print(Training Error = + str(trainErr)) end = time.time() print(Time is = + str(end - start))