Re: Is there a way to get previous/other keys' state in Spark Streaming?
Thank you, TD. This is important information for us. Will keep an eye on that. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, this is the limitation of the current implementation. But this will be improved a lt when we have IndexedRDD https://github.com/apache/spark/pull/1297 in the Spark that allows faster single value updates to a key-value (within each partition, without processing the entire partition. Soon. TD On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
error from DecisonTree Training:
Hi All, I got an error while using DecisionTreeModel (my program is written in Java, spark 1.0.1, scala 2.10.1). I have read a local file, loaded it as RDD, and then sent to decisionTree for training. See below for details: JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache(); LogisticRegressionModel model = LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize); // until here it is working Strategy strategy = new Strategy( ); DecisionTree decisionTree = new DecisionTree(strategy); DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd()); The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.regression.LabeledPoint; Any thoughts? Best regards, Jack
Re: Spark Streaming
Thanks Tathagata! I tried it, and worked out perfectly. On Thu, Jul 17, 2014 at 6:34 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the MapReduce. You can open connection, get all the data and buffer it, close connection, return iterator to the buffer Step 2: Make step 1 better, by making it reuse connections. You can use singletons / static vars, to lazily initialize and reuse a pool of connections. You will have to take care of concurrency, as multiple tasks may using the database in parallel in the same worker JVM. TD On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan fanguan...@gmail.com wrote: Hi, All When I run spark streaming, in one of the flatMap stage, I want to access database. Code looks like : stream.flatMap( new FlatMapFunction { call () { //access database cluster } } ) Since I don't want to create database connection every time call() was called, where is the best place do I create the connection and reuse it on per-host basis (Like one database connection per Mapper/Reducer ) ? Regards, Guangle
Re: Large scale ranked recommendation
It is very true that making predictions in batch for all 1 million users against the 10k items will be quite onerous in terms of computation. I have run into this issue too in making batch predictions. Some ideas: 1. Do you really need to generate recommendations for each user in batch? How are you serving these recommendations? In most cases, you only need to make recs when a user is actively interacting with your service or product etc. Doing it all in batch tends to be a big waste of computation resources. In our system for example we are serving them in real time (as a user arrives at a web page, say, our customer hits our API for recs), so we only generate the rec at that time. You can take a look at Oryx for this ( https://github.com/cloudera/oryx) though it does not yet support Spark, you may be able to save the model into the correct format in HDFS and have Oryx read the data. 2. If you do need to make the recs in batch, then I would suggest: (a) because you have few items, I would collect the item vectors and form a matrix. (b) broadcast that matrix (c) do a mapPartitions on the user vectors. Form a user matrix from the vectors in each partition (maybe create quite a few partitions to make each user matrix not too big) (d) do a value call on the broadcasted item matrix (e) now for each partition you have the (small) item matrix and a (larger) user matrix. Do a matrix multiply and you end up with a (U x I) matrix with the scores for each user in the partition. Because you are using BLAS here, it will be significantly faster than individually computed dot products (f) sort the scores for each user and take top K (g) save or collect and do whatever with the scores 3. in conjunction with (2) you can try throwing more resources at the problem too If you access the underlying Breeze vectors (I think the toBreeze method is private so you may have to re-implement it), you can do all this using Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot). Hope that helps Nick On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote: Yes, thats what prediction should be doing, taking dot products or sigmoid function for each user,item pair. For 1 million users and 10 K items data there are 10 billion pairs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
data locality
I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Good to know! I am bumping the priority of this issue in my head. Thanks for the feedback. Others seeing this thread, please comment if you think that this is an important issue for you as well. Not at my computer right now but I will make a Jira for this. TD On Jul 17, 2014 11:22 PM, Yan Fang yanfang...@gmail.com wrote: Thank you, TD. This is important information for us. Will keep an eye on that. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, this is the limitation of the current implementation. But this will be improved a lt when we have IndexedRDD https://github.com/apache/spark/pull/1297 in the Spark that allows faster single value updates to a key-value (within each partition, without processing the entire partition. Soon. TD On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: data locality
Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Large scale ranked recommendation
And you might want to apply clustering before. It is likely that every user and every item are not unique. Bertrand Dechoux On Fri, Jul 18, 2014 at 9:13 AM, Nick Pentreath nick.pentre...@gmail.com wrote: It is very true that making predictions in batch for all 1 million users against the 10k items will be quite onerous in terms of computation. I have run into this issue too in making batch predictions. Some ideas: 1. Do you really need to generate recommendations for each user in batch? How are you serving these recommendations? In most cases, you only need to make recs when a user is actively interacting with your service or product etc. Doing it all in batch tends to be a big waste of computation resources. In our system for example we are serving them in real time (as a user arrives at a web page, say, our customer hits our API for recs), so we only generate the rec at that time. You can take a look at Oryx for this ( https://github.com/cloudera/oryx) though it does not yet support Spark, you may be able to save the model into the correct format in HDFS and have Oryx read the data. 2. If you do need to make the recs in batch, then I would suggest: (a) because you have few items, I would collect the item vectors and form a matrix. (b) broadcast that matrix (c) do a mapPartitions on the user vectors. Form a user matrix from the vectors in each partition (maybe create quite a few partitions to make each user matrix not too big) (d) do a value call on the broadcasted item matrix (e) now for each partition you have the (small) item matrix and a (larger) user matrix. Do a matrix multiply and you end up with a (U x I) matrix with the scores for each user in the partition. Because you are using BLAS here, it will be significantly faster than individually computed dot products (f) sort the scores for each user and take top K (g) save or collect and do whatever with the scores 3. in conjunction with (2) you can try throwing more resources at the problem too If you access the underlying Breeze vectors (I think the toBreeze method is private so you may have to re-implement it), you can do all this using Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot). Hope that helps Nick On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote: Yes, thats what prediction should be doing, taking dot products or sigmoid function for each user,item pair. For 1 million users and 10 K items data there are 10 billion pairs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Last step of processing is using too much memory.
Well, for what it's worth, I found the issue after spending the whole night running experiments;). Basically, I needed to give a higher number of partition for the groupByKey. I was simply using the default, which generated only 4 partitions and so the whole thing blew up. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: data locality
Sandy, Do you mean the “preferred location” is working for standalone cluster also? Because I check the code of SparkContext and see comments as below: // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() BTW, even with the preferred hosts, how does Spark decide how many total executors to use for this application? Thanks again! From: Sandy Ryza [mailto:sandy.r...@cloudera.com] Sent: Friday, July 18, 2014 3:44 PM To: user@spark.apache.org Subject: Re: data locality Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
incompatible local class serialVersionUID with spark Shark
Hello, I want to run Shark on yarn. My environment Shark-0.9.1. Spark-1.0.0 hadoop-2.3.0 My first question is that: Is it possible to run shark-0.9.1 with Spark-1.0.0 on yarn? or Shark and Spark have to be necessarily in the same version? For the moment, when i make a request like show tables,show databases on Shark, it works well. But, when i make any select query (like select * from test), i get this error: org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception failure: java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = -14123687772669932, local class serialVersionUID = -6531051710822689816) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) FAILED: Execution Error, return code -101 from shark.execution.SparkTask Any ideas? i looked for a solution all the week, but without success.Any suggestions will be appreciate. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/incompatible-local-class-serialVersionUID-with-spark-Shark-tp10149.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
concurrent jobs
By looking at the code of JobScheduler, I find a parameter of below: private val numConcurrentJobs = ssc.conf.getInt(spark.streaming.concurrentJobs, 1) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) Does that mean each App can have only one active stage? In my psydo-code below: S1 = viewDStream.forEach( collect() ).. S2 = viewDStream.forEach( collect() ).. There should be two “collect()” jobs for each batch interval, right? Are they running in parallel? Thank you!
Re: GraphX Pragel implementation
Thanks On Fri, Jul 18, 2014 at 12:22 AM, Ankur Dave ankurd...@gmail.com wrote: If your sendMsg function needs to know the incoming messages as well as the vertex value, you could define VD to be a tuple of the vertex value and the last received message. The vprog function would then store the incoming messages into the tuple, allowing sendMsg to access them. For example, if the vertex value was a String and the message type was an Int, you could call Pregel as follows: val graph: Graph[String, _] = ... graph.mapVertices((id, attr) = (attr, 0)).pregel(0)( (id, attr: (String, Int), msg: Int) = (attr._1, msg), edge = Iterator(...), // can use edge.srcAttr._2 and edge.dstAttr._2 to access the messages (a: Int, b: Int) = a + b) Ankur http://www.ankurdave.com/
Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
Hi, Yes, the error still occurs when we replace the lambdas with named functions: (same error traces as in previous posts) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-line11-read-when-loading-an-HDFS-text-file-with-SparkQL-in-spark-shell-tp9954p10154.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY
Hi again! I am having problems when using GROUP BY on both SQLContext and HiveContext (same problem). My code (simplified as much as possible) can be seen here: http://pastebin.com/33rjW67H In short, I'm getting data from a Cassandra store with Datastax' new driver (which works great by the way, recommended!), and mapping it to a Spark SQL table through a Product class (Dokument in the source). Regular SELECTs and stuff works fine, but once I try to do a GROUP BY, I get the following error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:25 failed 4 times, most recent failure: Exception failure in TID 63 on host 192.168.121.132: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158) org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) What am I doing wrong? -- Best regards, Martin Gammelsæter
spark sql left join gives KryoException: Buffer overflow
Hi, We have a query with left joining and got this error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Looks like spark sql tried to do a broadcast join and collecting one of the table to master but it is too large. How do we explicitly control the join behavior like this? -- Pei-Lun Lee
What is shuffle spill to memory?
Hi, in the Spark UI, one of the metrics is shuffle spill (memory). What is it exactly? Spilling to disk when the shuffle data doesn't fit in memory I get it, but what does it mean to spill to memory? Thanks, - Sebastien
Re: Error with spark-submit (formatting corrected)
Hi, Instead of spark://10.1.3.7:7077 use spark://vmsparkwin1:7077 try this $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://vmsparkwin1:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 Thanks Regards, Meethu M On Friday, 18 July 2014 7:51 AM, Jay Vyas jayunit100.apa...@gmail.com wrote: I think I know what is happening to you. I've looked some into this just this week, and so its fresh in my brain :) hope this helps. When no workers are known to the master, iirc, you get this message. I think this is how it works. 1) You start your master 2) You start a slave, and give it master url as an argument. 3) The slave then binds to a random port 4) The slave then does a handshake with master, which you can see in the slave logs (it sais something like sucesfully connected to master at …. Actualy, i think tha master also logs that it now is aware of a slave running on ip:port… So in your case, I suspect, none of the slaves have connected to the master, so the job sits idle. This is similar to the yarn scenario of submitting a job to a resource manager with no node-managers running. On Jul 17, 2014, at 6:57 PM, ranjanp piyush_ran...@hotmail.com wrote: Hi, I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception. 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help. Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3 SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers: URL: spark://vmsparkwin1:7077 Workers: 2 Cores: 6 Total, 0 Used Memory: 10.0 GB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE I try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below: $ export MASTER=spark://vmsparkwin1:7077 $ echo $MASTER spark://vmsparkwin1:7077 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 The following is the full screen output: 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser) 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started 14/07/17 01:20:14 INFO Remoting: Starting remoting 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b606 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842) 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager
Dividing tasks among Spark workers
I am running my program on a spark cluster but when I look into my UI while the job is running I see that only one worker does most of the tasks. My cluster has one master and 4 workers where the master is also a worker. I want my task to complete as quickly as possible and I believe that if the number of tasks were to be divided equally among the workers, the job will be completed faster. Is there any way I can customize the umber of job on each worker? http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Hi,Svend Your reply is very helpful to me. I'll keep an eye on that ticket. And also... Cheers :) Best Regards, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10162.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: how to pass extra Java opts to workers for spark streaming jobs
Thanks Andrew, I tried and it works. On Fri, Jul 18, 2014 at 12:53 AM, Andrew Or and...@databricks.com wrote: You will need to include that in the SPARK_JAVA_OPTS environment variable, so add the following line to spark-env.sh: export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC This should propagate to the executors. (Though you should double check, since 0.9 is a little old and I could be forgetting something) If you wish to add spark options in addition to this, simply append them to the environment variable: export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC -Dspark.config.one=value -Dspark.config.two=value (Please note that this is only for Spark 0.9. The part where we set Spark options within SPARK_JAVA_OPTS is deprecated as of 1.0) 2014-07-17 21:08 GMT-07:00 Chen Song chen.song...@gmail.com: Thanks Andrew. Say that I want to turn on CMS gc for each worker. All I need to do is add the following line to conf/spark-env.sh on node where I submit the application. -XX:+UseConcMarkSweepGC Is that correct? Will this option be populated to each worker in yarn? On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote: Hi Chen, spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark 0.9. You need to export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2 in conf/spark-env.sh. Let me know if that works. Andrew 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com: Can you check in the environment tab of Spark web ui to see whether this configuration parameter is in effect? TD On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote: I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song -- Chen Song -- Chen Song
Re: spark streaming rate limiting from kafka
Thanks Tathagata, That would be awesome if Spark streaming can support receiving rate in general. I tried to explore the link you provided but could not find any specific JIRA related to this? Do you have the JIRA number for this? On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song -- Chen Song
Re: Distribute data from Kafka evenly on cluster
Speaking of this, I have another related question. In my spark streaming job, I set up multiple consumers to receive data from Kafka, with each worker from one partition. Initially, Spark is intelligent enough to associate each worker to each partition, to make data consumption distributed. After running for a while, consumers rebalance themselves and some workers start reading partitions which were with others. This leads to a situation that some worker read from multiple partitions and some don't read at all. Because of data volume, this causes heap pressure on some workers. Any thoughts on why rebalance is triggered and how to monitor to avoid that? On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, unfortunately, when I go the above approach, I run into this problem: http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver will retry again and again, but will eventually fail, leading to unprocessed data and, worse, the task never terminating. There is nothing exotic about my setup; one Zookeeper node, one Kafka broker, so I am wondering if other people have seen this error before and, more important, how to fix it. When I don't use the approach of multiple kafkaStreams, I don't get this error, but also work is never distributed in my cluster... Thanks Tobias On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote: Thank you very much for the link, that was very helpful! So, apparently the `topics: Map[String, Int]` parameter controls the number of partitions that the data is initially added to; the number N in val kafkaInputs = (1 to N).map { _ = ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1)) } val union = ssc.union(kafkaInputs) controls how many connections are made to Kafka. Note that the number of Kafka partitions for that topic must be at least N for this to work. Thanks Tobias -- Chen Song
Re: Dividing tasks among Spark workers
The default # of partitions is the # of cores, correct? On 7/18/14, 10:53 AM, Yanbo Liang wrote: check how many partitions in your program. If only one, change it to more partitions will make the execution parallel. 2014-07-18 20:57 GMT+08:00 Madhura das.madhur...@gmail.com mailto:das.madhur...@gmail.com: I am running my program on a spark cluster but when I look into my UI while the job is running I see that only one worker does most of the tasks. My cluster has one master and 4 workers where the master is also a worker. I want my task to complete as quickly as possible and I believe that if the number of tasks were to be divided equally among the workers, the job will be completed faster. Is there any way I can customize the umber of job on each worker? http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Distribute data from Kafka evenly on cluster
Hi, as far as I know, rebalance is triggered from Kafka in order to distribute partitions evenly. That is, to achieve the opposite of what you are seeing. I think it would be interesting to check the Kafka logs for the result of the rebalance operation and why you see what you are seeing. I know that in the client logs it says which partitions of a topic were assigned to this particular consumer, maybe you can have a look. Tobias On Fri, Jul 18, 2014 at 11:42 PM, Chen Song chen.song...@gmail.com wrote: Speaking of this, I have another related question. In my spark streaming job, I set up multiple consumers to receive data from Kafka, with each worker from one partition. Initially, Spark is intelligent enough to associate each worker to each partition, to make data consumption distributed. After running for a while, consumers rebalance themselves and some workers start reading partitions which were with others. This leads to a situation that some worker read from multiple partitions and some don't read at all. Because of data volume, this causes heap pressure on some workers. Any thoughts on why rebalance is triggered and how to monitor to avoid that? On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, unfortunately, when I go the above approach, I run into this problem: http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver will retry again and again, but will eventually fail, leading to unprocessed data and, worse, the task never terminating. There is nothing exotic about my setup; one Zookeeper node, one Kafka broker, so I am wondering if other people have seen this error before and, more important, how to fix it. When I don't use the approach of multiple kafkaStreams, I don't get this error, but also work is never distributed in my cluster... Thanks Tobias On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote: Thank you very much for the link, that was very helpful! So, apparently the `topics: Map[String, Int]` parameter controls the number of partitions that the data is initially added to; the number N in val kafkaInputs = (1 to N).map { _ = ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1)) } val union = ssc.union(kafkaInputs) controls how many connections are made to Kafka. Note that the number of Kafka partitions for that topic must be at least N for this to work. Thanks Tobias -- Chen Song
Re: Dividing tasks among Spark workers
Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size. You can pass the level of parallelism as a second argument or set the config property *spark.default.parallelism* to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster. For example, the following code can set the partition number of data to 10 and it will be executed parallel: val data = Array(1, 2, 3, 4, 5)val distData = sc.parallelize(data,10) 2014-07-18 23:00 GMT+08:00 Shannon Quinn squ...@gatech.edu: The default # of partitions is the # of cores, correct? On 7/18/14, 10:53 AM, Yanbo Liang wrote: check how many partitions in your program. If only one, change it to more partitions will make the execution parallel. 2014-07-18 20:57 GMT+08:00 Madhura das.madhur...@gmail.com: I am running my program on a spark cluster but when I look into my UI while the job is running I see that only one worker does most of the tasks. My cluster has one master and 4 workers where the master is also a worker. I want my task to complete as quickly as possible and I believe that if the number of tasks were to be divided equally among the workers, the job will be completed faster. Is there any way I can customize the umber of job on each worker? http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi I am able to save my RDD generated to local file that are coming from Spark SQL that are getting from Spark Streaming. If i put the steamingcontext to 10 sec the data coming in that 10 sec time window is only processed by my sql and the data is stored in the location i specified and for next set of data (streamingcontext) its erroring that the save to folder already exist. So i increase my time sparkcontext duration to 100 sec for this the data thats comes in 100 sec window is processed at once and outputting the data to several files in that folder like 10 different files (part-0001,part-2...) each having one or two records. but i want to save those files to single file. Please let me know if there any work around solution for this. the code that i am using case class Record(ID:String,name:String,score:Int,school:String) case class OutPut(name:String,score:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val datenow = new Date() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount); val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(100)) val sqlContext = new SQLContext(sc) val timer = Time(10) ssc.remember(Seconds(100)) //val timenow = new java.util.Date import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields =jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString.toInt,data(school).toString)) fields.print() //fields.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`) val results = fields.foreachRDD((recrdd,timer) = { recrdd.registerAsTable(table1) val sqlreport =sqlContext.sql(select max(score) from table1 where ID = 'math' and score 50) sqlreport.map(t= OutPut(t(0).toString,t(1).toString)).collect().foreach(println) //println(sqlreport) //sqlreport.foreach(println) sqlreport.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/+datenow) }) //results.print() ssc.start() ssc.awaitTermination() } Thanks, -Srinivas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Python: saving/reloading RDD
Hello, Just to make sure I correctly read the doc and the forums. It's my understanding that currently in python with Spark 1.0.1 there is no way to save my RDD to disk that I can just reload. The hadoop RDD are not yet present in Python. Is that correct? I just want to make sure that's the case before I write a workaround. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
Nick's suggestion is a good approach for your data. The item factors to broadcast should be a few MBs. -Xiangrui On Jul 18, 2014, at 12:59 AM, Bertrand Dechoux decho...@gmail.com wrote: And you might want to apply clustering before. It is likely that every user and every item are not unique. Bertrand Dechoux On Fri, Jul 18, 2014 at 9:13 AM, Nick Pentreath nick.pentre...@gmail.com wrote: It is very true that making predictions in batch for all 1 million users against the 10k items will be quite onerous in terms of computation. I have run into this issue too in making batch predictions. Some ideas: 1. Do you really need to generate recommendations for each user in batch? How are you serving these recommendations? In most cases, you only need to make recs when a user is actively interacting with your service or product etc. Doing it all in batch tends to be a big waste of computation resources. In our system for example we are serving them in real time (as a user arrives at a web page, say, our customer hits our API for recs), so we only generate the rec at that time. You can take a look at Oryx for this (https://github.com/cloudera/oryx) though it does not yet support Spark, you may be able to save the model into the correct format in HDFS and have Oryx read the data. 2. If you do need to make the recs in batch, then I would suggest: (a) because you have few items, I would collect the item vectors and form a matrix. (b) broadcast that matrix (c) do a mapPartitions on the user vectors. Form a user matrix from the vectors in each partition (maybe create quite a few partitions to make each user matrix not too big) (d) do a value call on the broadcasted item matrix (e) now for each partition you have the (small) item matrix and a (larger) user matrix. Do a matrix multiply and you end up with a (U x I) matrix with the scores for each user in the partition. Because you are using BLAS here, it will be significantly faster than individually computed dot products (f) sort the scores for each user and take top K (g) save or collect and do whatever with the scores 3. in conjunction with (2) you can try throwing more resources at the problem too If you access the underlying Breeze vectors (I think the toBreeze method is private so you may have to re-implement it), you can do all this using Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot). Hope that helps Nick On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote: Yes, thats what prediction should be doing, taking dot products or sigmoid function for each user,item pair. For 1 million users and 10 K items data there are 10 billion pairs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python: saving/reloading RDD
You can save RDDs to text files using RDD.saveAsTextFile and load it back using sc.textFile. But make sure the record to string conversion is correctly implemented if the type is not primitive and you have the parser to load them back. -Xiangrui On Jul 18, 2014, at 8:39 AM, Roch Denis rde...@exostatic.com wrote: Hello, Just to make sure I correctly read the doc and the forums. It's my understanding that currently in python with Spark 1.0.1 there is no way to save my RDD to disk that I can just reload. The hadoop RDD are not yet present in Python. Is that correct? I just want to make sure that's the case before I write a workaround. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python: saving/reloading RDD
+1, had to learn this the hard way when some of my objects were written as pointers, rather than translated correctly to strings :) On 7/18/14, 11:52 AM, Xiangrui Meng wrote: You can save RDDs to text files using RDD.saveAsTextFile and load it back using sc.textFile. But make sure the record to string conversion is correctly implemented if the type is not primitive and you have the parser to load them back. -Xiangrui On Jul 18, 2014, at 8:39 AM, Roch Denis rde...@exostatic.com wrote: Hello, Just to make sure I correctly read the doc and the forums. It's my understanding that currently in python with Spark 1.0.1 there is no way to save my RDD to disk that I can just reload. The hadoop RDD are not yet present in Python. Is that correct? I just want to make sure that's the case before I write a workaround. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the default GraphX graph-partition strategy on multicore machine?
Hi Ankur, Thanks so much! :)) Yes, is possible to defining a custom partition strategy? And, some other questions: (2*4 cores machine, 24GB memory) - if I load one edges file(5 GB), without any cores/partitions setting, what is the default partition in graph construction? and how many cores will be used? Or, if the size of file is 50 GB(more than available memory, without partition setting)? - because each vertex must be replicated to all partitions where it is referenced. I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. - there is no shared-memory parallelism. You mean that the core is stricter to access only its own partition in memory? how do they communicate when the required data(edges?) in another partition? On Jul 15, 2014, at 9:30 PM, Ankur Dave ankurd...@gmail.com wrote: On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: Btw, is there any possibility to customise the partition strategy as we expect? I'm not sure I understand. Are you asking about defining a custom partition strategy? On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote: when I load the file using sc.textFile (minPartitions = 16, PartitionStrategy.RandomVertexCut) The easiest way to load the edge file would actually be to use GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut). 1) how much data will be loaded into memory? The exact size of the graph (vertices + edges) in memory depends on the graph's structure, the partition function, and the average vertex degree, because each vertex must be replicated to all partitions where it is referenced. It's easier to estimate the size of just the edges, which I did on the mailing list a while ago. To summarize, during graph construction each edge takes 60 bytes, and once the graph is constructed it takes 20 bytes. 2) how many partitions will be stored in memory? Once you call cache() on the graph, all 16 partitions will be stored in memory. You can also tell GraphX to spill them to disk in case of memory pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to GraphLoader.edgeListFile. 3) If the thread/task on each core will read only one edge from memory and then compute it at every time? Yes, each task is single-threaded, so it will read the edges in its partition sequentially. 3.1) which edge on memory was determined to read into cache? In theory, each core should independently read the edges in its partition into its own cache. Cache issues should be much less of a concern than in most applications because different tasks (cores) operate on independent partitions; there is no shared-memory parallelism. The tradeoff is heavier reliance on shuffles. 3.2) how are those partitions being scheduled? Spark handles the scheduling. There are details in Section 5.1 of the Spark NSDI paper; in short, tasks are scheduled to run on the same machine as their data partition, and by default each machine can accept at most one task per core. Ankur
Re: Python: saving/reloading RDD
Yeah but I would still have to do a map pass with an ast.litteral_eval() for each line, correct? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172p10179.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What is shuffle spill to memory?
Shuffle spill (memory) is the size of the deserialized form of the data in memory at the time when we spill it, whereas shuffle spill (disk) is the size of the serialized form of the data on disk after we spill it. This is why the latter tends to be much smaller than the former. Note that both metrics are aggregated over the entire duration of the task (i.e. within each task you can spill multiple times). Andrew 2014-07-18 4:09 GMT-07:00 Sébastien Rainville sebastienrainvi...@gmail.com : Hi, in the Spark UI, one of the metrics is shuffle spill (memory). What is it exactly? Spilling to disk when the shuffle data doesn't fit in memory I get it, but what does it mean to spill to memory? Thanks, - Sebastien
Re: Need help on Spark UDF (Join) Performance tuning .
Hello Experts, Appreciate your input highly, please suggest/ give me hint, what would be the issue here? Thanks and Regards, Malligarjunan S. On Thursday, 17 July 2014, 22:47, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Experts, I am facing performance problem when I use the UDF function call. Please help me to tune the query. Please find the details below shark select count(*) from table1; OK 151096 Time taken: 7.242 seconds shark select count(*) from table2; OK 938 Time taken: 1.273 seconds Without UDF: shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE pvc1.col1 = pvc2.col2 AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 328 Time taken: 200.487 seconds shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 292.86 seconds With UDF: shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 3718.23 seconds The above UDF query takes more time to run. Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2 Please let me know what is the issue here? Thanks and Regards, Sankar S.
Re: Spark Streaming timestamps
Hi Tathagata, On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. If you want to do custom intervals, then I suggest the following 1. Do 1 second batch intervals 2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in a ArrayBuffer/ListBuffer 3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered RDDs, and process them. So in foreachRDD, based on the time, buffer the RDDs, until you reach the appropriate time. Then union all the buffered RDDs and process them. TD On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Thanks for your answer. Please see my further question below: On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function that takes in Time object. 3) How can we specify the starting time of the batches? What do you mean? Batches are timed based on the system time of the cluster. I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch? Thanks! Bill
Re: Large scale ranked recommendation
Thanks Nick real-time suggestion is good, will see if we can add that to our deployment strategy and you are correct we may not need recommendation for each user. Will try adding more resources and broadcasting item features suggestion as currently they don't seem to be huge. As users and items both will continue to grow in future for faster vector computations I think few GPU nodes will suffice to serve faster recommendation after learning model with SPARK. It will be great to have builtin GPU support in SPARK for faster computations to leverage GPU capability of nodes for performing these flops faster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLLib - Regularized logistic regression in python
Thanks for all your helpful replies. Best, Francisco -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Regularized-logistic-regression-in-python-tp9780p10184.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
Agree GPUs may be interesting for this kind of massively parallel linear algebra on reasonable size vectors. These projects might be of interest in this regard: https://github.com/BIDData/BIDMach https://github.com/BIDData/BIDMat https://github.com/dlwh/gust Nick On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote: Thanks Nick real-time suggestion is good, will see if we can add that to our deployment strategy and you are correct we may not need recommendation for each user. Will try adding more resources and broadcasting item features suggestion as currently they don't seem to be huge. As users and items both will continue to grow in future for faster vector computations I think few GPU nodes will suffice to serve faster recommendation after learning model with SPARK. It will be great to have builtin GPU support in SPARK for faster computations to leverage GPU capability of nodes for performing these flops faster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Decision tree classifier in MLlib
Hi Sudha, Have you checked if the labels are being loaded correctly? It sounds like the DT algorithm can't find any useful splits to make, so maybe it thinks they are all the same? Some data loading functions threshold labels to make them binary. Hope it helps, Joseph On Fri, Jul 11, 2014 at 2:25 PM, SK skrishna...@gmail.com wrote: Hi, I have a small dataset (120 training points, 30 test points) that I am trying to classify into binary classes (1 or 0). The dataset has 4 numerical features and 1 binary label (1 or 0). I used LogisticRegression and SVM in MLLib and I got 100% accuracy in both cases. But when I used DecisionTree, I am getting only 33% accuracy (basically all the predicted test labels are 1 whereas actually only 10 out of the 30 should be 1). I tried modifying the different parameters (maxDepth, bins, impurity etc) and still am able to get only 33% accuracy. I used the same dataset with R's decision tree (rpart) and I am getting 100% accuracy. I would like to understand why the performance of MLLib's decision tree model is poor and if there is some way I can improve it. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Job aborted due to stage failure: TID x failed for unknown reasons
Hi all, I'm dealing with some strange error messages that I *think* comes down to a memory issue, but I'm having a hard time pinning it down and could use some guidance from the experts. I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one has 16GB memory, the other 32GB (which is the master). My application involves computing pairwise pixel affinities in images, though the images I've tested so far only get as big as 1920x1200, and as small as 16x16. I did have to change a few memory and parallelism settings, otherwise I was getting explicit OutOfMemoryExceptions. In spark-default.conf: spark.executor.memory14g spark.default.parallelism32 spark.akka.frameSize1000 In spark-env.sh: SPARK_DRIVER_MEMORY=10G With those settings, however, I get a bunch of WARN statements about Lost TIDs (no task is successfully completed) in addition to lost Executors, which are repeated 4 times until I finally get the following error message and crash: --- 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243 Traceback (most recent call last): File /home/user/Programming/PySpark-Affinities/affinity.py, line 243, in module lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]]) File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:13 failed 4 times, most recent failure: *TID 32 on host master.host.univ.edu failed for unknown reason* Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4) 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor 4 from BlockManagerMaster. 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in removeExecutor user@master:~/Programming/PySpark-Affinities$ --- If I run the really small image instead (16x16), it *appears* to run to completion (gives me the output I expect without any exceptions being thrown). However, in the stderr logs for the app that was run, it lists the state as KILLED with the final message a ERROR CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger images, I get the exception I pasted above. Furthermore, if I just do a spark-submit with master=local[*], aside from still needing to set the aforementioned memory options, it will work for an image of *any* size (I've tested both machines independently; they both do this when running as local[*]), whereas working on a cluster will result in the aforementioned crash at stage 0 with anything but the smallest images. Any ideas what is going on? Thank you very much in advance! Regards,
Re: spark streaming rate limiting from kafka
Oops, wrong link! JIRA: https://github.com/apache/spark/pull/945/files Github PR: https://github.com/apache/spark/pull/945/files On Fri, Jul 18, 2014 at 7:19 AM, Chen Song chen.song...@gmail.com wrote: Thanks Tathagata, That would be awesome if Spark streaming can support receiving rate in general. I tried to explore the link you provided but could not find any specific JIRA related to this? Do you have the JIRA number for this? On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song -- Chen Song
Re: Memory compute-intensive tasks
Hi Matei- Changing to coalesce(numNodes, true) still runs all partitions on a single node, which I verified by printing the hostname before I exec the external process. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10189.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming rate limiting from kafka
Dang! Messed it up again! JIRA: https://issues.apache.org/jira/browse/SPARK-1341 Github PR: https://github.com/apache/spark/pull/945/files On Fri, Jul 18, 2014 at 11:35 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Oops, wrong link! JIRA: https://github.com/apache/spark/pull/945/files Github PR: https://github.com/apache/spark/pull/945/files On Fri, Jul 18, 2014 at 7:19 AM, Chen Song chen.song...@gmail.com wrote: Thanks Tathagata, That would be awesome if Spark streaming can support receiving rate in general. I tried to explore the link you provided but could not find any specific JIRA related to this? Do you have the JIRA number for this? On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song -- Chen Song
Spark Streaming with long batch / window duration
Would it be a reasonable use case of spark streaming to have a very large window size (lets say on the scale of weeks). In this particular case the reduce function would be invertible so that would aid in efficiency. I assume that having a larger batch size since the window is so large would also lighten the workload for spark. The sliding duration is not too important, I just want to know if this is reasonable for spark to handle with any slide duration -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the default GraphX graph-partition strategy on multicore machine?
On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: Yes, is possible to defining a custom partition strategy? Yes, you just need to create a subclass of PartitionStrategy as follows: import org.apache.spark.graphx._ object MyPartitionStrategy extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { // put your hash function here } } if I load one edges file(5 GB), without any cores/partitions setting, what is the default partition in graph construction? and how many cores will be used? Or, if the size of file is 50 GB(more than available memory, without partition setting)? If you don't specify a number of partitions, it will default to 2 or the number of blocks in the input file, whichever is greater (this is the same behavior as sc.textFile https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1243). I'm not sure exactly how many this will be for your file, but you can find out by running sc.textFile(...).partitions.length. The default partitioning strategy is to leave the edges in the same partitions as they were initially loaded from. For the 50 GB file, everything is the same except the default number of partitions will probably be larger. Hopefully each partition will individually still fit in memory, which will allow GraphX to proceed. the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Yes, for that graph and partitioning, all vertices will be replicated to all edge partitions. It won't be quite as bad for most graphs, and EdgePartition2D http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$$EdgePartition2D$ provides a bound on the replication factor, as described in the docs. You mean that the core is stricter to access only its own partition in memory? how do they communicate when the required data(edges?) in another partition? Right, there is one task per core, and each task only accesses its own partition, never the partitions of other tasks. Tasks communicate in two ways: 1. Vertex replication (multicast). When an edge needs to access its neighboring vertices (to construct the triplets or perform the map step in mapReduceTriplets), GraphX replicates vertices to all edge partitions where they are referenced. 2. Vertex aggregation. When an edge needs to update the value of a neighboring vertex (to perform the reduce step in mapReduceTriplets), GraphX aggregates vertex updates from the edge partitions back to the vertices. Both of these communication steps happen using Spark shuffles, which work by writing messages to disk and notifying other tasks to read them from disk. Note that edges never need to access other edges directly, and all communication is done through the vertices. Ankur http://www.ankurdave.com/
Re: the default GraphX graph-partition strategy on multicore machine?
Sorry, I didn't read your vertex replication example carefully, so my previous answer is wrong. Here's the correct one: On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Vertices can be replicated individually without requiring the entire vertex partition to be replicated. In this case, here's what will get replicated to each partition: EA: a (from VA), b (from VA), c (from VA) EB: a (from VA), d (from VB), e (from VB) EC: c (from VA), d (from VB) Ankur http://www.ankurdave.com/
Re: incompatible local class serialVersionUID with spark Shark
There is no version of shark that works with spark 1.0. More details about the path forward here: http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html On Jul 18, 2014 4:53 AM, Megane1994 leumenilari...@yahoo.fr wrote: Hello, I want to run Shark on yarn. My environment Shark-0.9.1. Spark-1.0.0 hadoop-2.3.0 My first question is that: Is it possible to run shark-0.9.1 with Spark-1.0.0 on yarn? or Shark and Spark have to be necessarily in the same version? For the moment, when i make a request like show tables,show databases on Shark, it works well. But, when i make any select query (like select * from test), i get this error: org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception failure: java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = -14123687772669932, local class serialVersionUID = -6531051710822689816) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) FAILED: Execution Error, return code -101 from shark.execution.SparkTask Any ideas? i looked for a solution all the week, but without success.Any suggestions will be appreciate. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/incompatible-local-class-serialVersionUID-with-spark-Shark-tp10149.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY
Sorry for the non-obvious error message. It is not valid SQL to include attributes in the select clause unless they are also in the group by clause or are inside of an aggregate function. On Jul 18, 2014 5:12 AM, Martin Gammelsæter martingammelsae...@gmail.com wrote: Hi again! I am having problems when using GROUP BY on both SQLContext and HiveContext (same problem). My code (simplified as much as possible) can be seen here: http://pastebin.com/33rjW67H In short, I'm getting data from a Cassandra store with Datastax' new driver (which works great by the way, recommended!), and mapping it to a Spark SQL table through a Product class (Dokument in the source). Regular SELECTs and stuff works fine, but once I try to do a GROUP BY, I get the following error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:25 failed 4 times, most recent failure: Exception failure in TID 63 on host 192.168.121.132: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158) org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) What am I doing wrong? -- Best regards, Martin Gammelsæter
Re: Need help on Spark UDF (Join) Performance tuning .
It's likely that since your UDF is a black box to hive's query optimizer that it must choose a less efficient join algorithm that passes all possible matches to your function for comparison. This will happen any time your UDF touches attributes from both sides of the join. In general you can learn more about the chosen execution strategy by running explain. On Jul 18, 2014 12:04 PM, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Experts, Appreciate your input highly, please suggest/ give me hint, what would be the issue here? Thanks and Regards, Malligarjunan S. On Thursday, 17 July 2014, 22:47, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Experts, I am facing performance problem when I use the UDF function call. Please help me to tune the query. Please find the details below shark select count(*) from table1; OK 151096 Time taken: 7.242 seconds shark select count(*) from table2; OK 938 Time taken: 1.273 seconds *Without UDF:*shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE pvc1.col1 = pvc2.col2 AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 328 Time taken: 200.487 seconds shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 292.86 seconds *With UDF:* shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 3718.23 seconds The above UDF query takes more time to run. Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2 Please let me know what is the issue here? Thanks and Regards, Sankar S.
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Can you tell us more about your environment. Specifically, are you also running on Mesos? On Jul 18, 2014 12:39 AM, Victor Sheng victorsheng...@gmail.com wrote: when I run a query to a hadoop file. mobile.registerAsTable(mobile) val count = sqlContext.sql(select count(1) from mobile) res5: org.apache.spark.sql.SchemaRDD = SchemaRDD[21] at RDD at SchemaRDD.scala:100 == Query Plan == ExistingRdd [data_date#0,mobile#1,create_time#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176 when I run collect. count.collect() It throws exceptions, Can anyone help me ? Job aborted due to stage failure: Task 3.0:22 failed 4 times, most recent failure: Exception failure in TID 153 on host wh-8-210: java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$1.next(Iterator.scala:853) scala.collection.Iterator$$anon$1.head(Iterator.scala:840) org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181) org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Driver stacktrace: java.lang.ExceptionInInitializerError at $line11.$read$$iwC.init(console:6) at $line11.$read.init(console:26) at $line11.$read$.init(console:30) at $line11.$read$.clinit(console) at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) My classpath is : /app/hadoop/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop0.20.2-cdh3u5.jar System Classpath /app/hadoop/spark-1.0.1/confSystem Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.3.2.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.6.6.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/ST4-4.0.4.jar System Classpath
Re: Cannot connect to hive metastore
See the section on advanced dependency management: http://spark.apache.org/docs/latest/submitting-applications.html On Jul 17, 2014 10:53 PM, linkpatrickliu linkpatrick...@live.com wrote: Seems like the mysql connector jar is not included in the classpath. Where can I set the jar to the classpath? hive-site.xml: property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=trueamp;characterEncoding=UTF-8/value descriptionJDBC connect string for a JDBC metastore/description /property Log: 14/07/18 11:46:58 ERROR DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1143) at org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1128) at org.apache.hadoop.hive.ql.exec.DDLTask.showTables(DDLTask.java:2236) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:333) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:163) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:250) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:100) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15) at $line9.$read$$iwC$$iwC$$iwC.init(console:20) at $line9.$read$$iwC$$iwC.init(console:22) at $line9.$read$$iwC.init(console:24) at $line9.$read.init(console:26) at $line9.$read$.init(console:30) at $line9.$read$.clinit(console) at $line9.$eval$.init(console:7) at $line9.$eval$.clinit(console) at $line9.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: can we insert and update with spark sql
You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Visualization/Summary tools for Spark Streaming data
Hi, This is my second week of working with Spark, pardon if this is elementary question in spark domain. I am looking for ways to render output of Spark Streaming. First let me describe problem set. I am monitoring (push from devices every minute) temperature/humidity and other environmental parameters across 1000 installations. This data comes to me via Kafka and goes into Spark Streaming for processing. There I am querying my own mySQL database to find out if specific location is exceeding thresholds configured for specific site if it does, it spits out alarm which is stored in mySQL DB. For all mySQL access I am using java odbc interface. Now the question all the trend data that I am saving for various environmental parameters how do I graph/visualize them. Are there any examples of how folks are doing visualization of data collected/refined by Spark Streaming. I watched the DataBricks presentation ( https://www.youtube.com/watch?v=dJQ5lV5Tldw) , that is the kind of system I am looking for, but they are under limited beta so no way for me to get hands on that tech. Are there any other platforms that offer similar functionality for visualizing data under Apache Spark? Regards, -Subodh
Reading Avro Sequence Files
I'm trying to read and an Avro Sequence File using the sequenceFile method on the spark context object and I get a NullPointerException. If I read the file outside of Spark using AvroSequenceFile.Reader I don't have any problems. Anyone have success in doing this? Below is one I typed and saw at the spark shell: scalavar myAvroSequenceFile = sc.sequenceFile(hdfs://my url is here, classOf[AvroKey[GenericRecord], ClassOf[AvroValue[GenericRecord]]) scalamyAvroSequenceFile.first 14/07/18 16:31:31 INFO FileInputFormat: Total input paths to process : 1 14/07/18 16:31:31 INFO SparkContext: Starting job: first at console:18 14/07/18 16:31:31 INFO DAGScheduler: Got job 2 (first at console:18) with 1 output partitions (allowLocal=true) 14/07/18 16:31:31 INFO DAGScheduler: Final stage: Stage 2(first at console:18) 14/07/18 16:31:31 INFO DAGScheduler: Parents of final stage: List() 14/07/18 16:31:31 INFO DAGScheduler: Missing parents: List() 14/07/18 16:31:31 INFO DAGScheduler: Computing the requested partition locally 14/07/18 16:31:31 INFO HadoopRDD: Input split: hdfs://my url 14/07/18 16:31:31 INFO DAGScheduler: Failed to run first at console:18 java.lang.NullPointerException at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1902) at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1765) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1714) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1728) at org.apache.hadoop.mapred.SequenceFileRecordReader.init(SequenceFileRecordReader.java:43) at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:59) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:190) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:181) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:93) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Avro-Sequence-Files-tp10201.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading Avro Sequence Files
Correction: I get a null pointer exception when I attempt to perform an action like 'first'. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Avro-Sequence-Files-tp10201p10202.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerException When Reading Avro Sequence Files
I think you probably want to use `AvroSequenceFileOutputFormat` with `newAPIHadoopFile`. I'm not even sure that in hadoop you would use SequenceFileInput format to read an avro sequence file -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10203.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerException When Reading Avro Sequence Files
Thanks for responding. I tried using the newAPIHadoopFile method and got an IO Exception with the message Not a data file. If anyone has an example of this working I'd appreciate your input or examples. What I entered at the repl and what I got back are below: val myAvroSequenceFile = sc.newAPIHadoopFile(hdfs://my url, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]) scala myAvroSequenceFile.first() 14/07/18 17:02:38 INFO FileInputFormat: Total input paths to process : 1 14/07/18 17:02:38 INFO SparkContext: Starting job: first at console:19 14/07/18 17:02:38 INFO DAGScheduler: Got job 0 (first at console:19) with 1 output partitions (allowLocal=true) 14/07/18 17:02:38 INFO DAGScheduler: Final stage: Stage 0(first at console:19) 14/07/18 17:02:38 INFO DAGScheduler: Parents of final stage: List() 14/07/18 17:02:38 INFO DAGScheduler: Missing parents: List() 14/07/18 17:02:38 INFO DAGScheduler: Computing the requested partition locally 14/07/18 17:02:38 INFO NewHadoopRDD: Input split: hdfs:my url 14/07/18 17:02:38 WARN AvroKeyInputFormat: Reader schema was not set. Use AvroJob.setInputKeySchema() if desired. 14/07/18 17:02:38 INFO AvroKeyInputFormat: Using a reader schema equal to the writer schema. 14/07/18 17:02:38 INFO DAGScheduler: Failed to run first at console:19 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:585) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:563) Caused by: java.io.IOException: Not a data file. at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105) at org.apache.avro.file.DataFileReader.init(DataFileReader.java:97) at org.apache.avro.mapreduce.AvroRecordReaderBase.createAvroFileReader(AvroRecordReaderBase.java:180) at org.apache.avro.mapreduce.AvroRecordReaderBase.initialize(AvroRecordReaderBase.java:90) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:114) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:100) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:62) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:261) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:578) ... 1 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10204.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
If you are performing recommendations via a latent factor model then I highly recommend you look into methods of approximate nearest neighbors. At Spotify we batch process top N recommendations for 40M users with a catalog of 40M items, but we avoid the naive O(n*m) process you are describing by performing an approximate nearest neighbors search. There are a bunch of open source packages you can use including our own https://github.com/spotify/annoy which uses random projections in your latent factor space to build a forest of trees with constant time nearest neighbors lookup. On Fri, Jul 18, 2014 at 1:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Agree GPUs may be interesting for this kind of massively parallel linear algebra on reasonable size vectors. These projects might be of interest in this regard: https://github.com/BIDData/BIDMach https://github.com/BIDData/BIDMat https://github.com/dlwh/gust Nick On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote: Thanks Nick real-time suggestion is good, will see if we can add that to our deployment strategy and you are correct we may not need recommendation for each user. Will try adding more resources and broadcasting item features suggestion as currently they don't seem to be huge. As users and items both will continue to grow in future for faster vector computations I think few GPU nodes will suffice to serve faster recommendation after learning model with SPARK. It will be great to have builtin GPU support in SPARK for faster computations to leverage GPU capability of nodes for performing these flops faster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark sql left join gives KryoException: Buffer overflow
Unfortunately, this is a query where we just don't have an efficiently implementation yet. You might try switching the table order. Here is the JIRA for doing something more efficient: https://issues.apache.org/jira/browse/SPARK-2212 On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, We have a query with left joining and got this error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Looks like spark sql tried to do a broadcast join and collecting one of the table to master but it is too large. How do we explicitly control the join behavior like this? -- Pei-Lun Lee
Re: Visualization/Summary tools for Spark Streaming data
You might check out Bokeh ( http://bokeh.pydata.org http://bokeh.pydata.org ) which is a python (and other languages) system for streaming and big data vis targeting the browser. I just gave a talk at SciPy 2014 where you can hear more and see examples: https://www.youtube.com/watch?v=B9NpLOyp-dI -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualization-Summary-tools-for-Spark-Streaming-data-tp10200p10207.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
I also tried increasing --num-executors to numNodes * coresPerNode and using coalesce(numNodes*10,true), and it still ran all the tasks on one node. It seems like it is placing all the executors on one node (though not always the same node, which indicates it is aware of more than one!). I'm using spark-submit --master yarn --deploy-mode cluster with spark-1.0.1 built for hadoop 2.4 on HDP 2.1/Hadoop 2.4. There's clearly just something wrong with my Hadoop configuration, or in how I'm submitting my spark job - any suggestions? Thanks, Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Broadcasting a set in PySpark
You have to use `myBroadcastVariable.value` to access the broadcasted value; see https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania ved...@retentionscience.com wrote: Hi All, I am trying to broadcast a set in a PySpark script. I create the set like this: Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect()) Then execute this line: uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set)) An error occurred while calling o104.collectPartitions. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1131:0 was 23503247 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. So I tried broadcasting it: Uid_male_setbc = sc.broadcast(Uid_male_set) Uid_male_setbc pyspark.broadcast.Broadcast object at 0x1ba2ed0 Then I execute it line: uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc)) ile stdin, line 1, in lambda TypeError: argument of type 'Broadcast' is not iterable [duplicate 1] So I am stuck either ways, the script runs locally well on a smaller dataset, but throws me this error. Could any one point out how to correct this or where I am going wrong? Thanks *Vedant Dhandhania* *Retention** Science* call: 805.574.0873 visit: Site http://www.retentionscience.com/ | like: Facebook http://www.facebook.com/RetentionScience | follow: Twitter http://twitter.com/RetentionSci
Re: Large scale ranked recommendation
Christopher, that's really a great idea to search in latent factor space rather than computing each entry of matrix, now the complexity of the problem has reduced drastically from naive O(n*m). Since our data is not that huge I will try exact nbrhood search then fallback to approximate if that don't work. I will look into annoy. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10212.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Broadcasting a set in PySpark
Hi Josh, I did make that change, however I get this error now: 568.492: [GC [PSYoungGen: 1412948K-207017K(1465088K)] 4494287K-3471149K(4960384K), 0.1280200 secs] [Times: user=0.23 sys=0.63, real=0.13 secs] 568.642: [Full GCTraceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line 708, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line 699, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line 619, in reduce vals = self.mapPartitions(func).collect() File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line 94, in __exit__ self._context._jsc.setCallSite(None) File /home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 535, in __call__ File /home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 361, in send_command File /home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 317, in _get_connection File /home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 324, in _create_connection File /home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 431, in start py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server *Vedant Dhandhania* *Retention** Science* call: 805.574.0873 visit: Site http://www.retentionscience.com/ | like: Facebook http://www.facebook.com/RetentionScience | follow: Twitter http://twitter.com/RetentionSci On Fri, Jul 18, 2014 at 3:10 PM, Josh Rosen rosenvi...@gmail.com wrote: You have to use `myBroadcastVariable.value` to access the broadcasted value; see https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania ved...@retentionscience.com wrote: Hi All, I am trying to broadcast a set in a PySpark script. I create the set like this: Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect()) Then execute this line: uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set)) An error occurred while calling o104.collectPartitions. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1131:0 was 23503247 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. So I tried broadcasting it: Uid_male_setbc = sc.broadcast(Uid_male_set) Uid_male_setbc pyspark.broadcast.Broadcast object at 0x1ba2ed0 Then I execute it line: uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc)) ile stdin, line 1, in lambda TypeError: argument of type 'Broadcast' is not iterable [duplicate 1] So I am stuck either ways, the script runs locally well on a smaller dataset, but throws me this error. Could any one point out how to correct this or where I am going wrong? Thanks *Vedant Dhandhania* *Retention** Science* call: 805.574.0873 visit: Site http://www.retentionscience.com/ | like: Facebook http://www.facebook.com/RetentionScience | follow: Twitter http://twitter.com/RetentionSci
Running Spark/YARN on AWS EMR - Issues finding file on hdfs?
I'm stumped with this one. I'm using YARN on EMR to distribute my spark job. While it seems initially, the job is starting up fine - the Spark Executor nodes are having trouble pulling the jars from the location on hdfs that the master just put the files on. [hadoop@ip-172-16-2-167 ~]$ SPARK_JAR=./spark/lib/spark-assembly-1.0.0-hadoop2.4.0.jar ./spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar /mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar --class com.evocalize.rickshaw.spark.applications.GenerateAssetContent --args yarn-standalone --num-workers 3 --master-memory 2g --worker-memory 2g --worker-cores 1 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/.versions/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] WARNING: This client is deprecated and will be removed in a future version of Spark. Use ./bin/spark-submit with --master yarn --args is deprecated. Use --arg instead. --num-workers is deprecated. Use --num-executors instead. --master-memory is deprecated. Use --driver-memory instead. --worker-memory is deprecated. Use --executor-memory instead. --worker-cores is deprecated. Use --executor-cores instead. 14/07/18 22:27:50 INFO client.RMProxy: Connecting to ResourceManager at /172.16.2.167:9022 14/07/18 22:27:51 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 2 14/07/18 22:27:51 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/18 22:27:51 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 3072 14/07/18 22:27:51 INFO yarn.Client: Preparing Local resources 14/07/18 22:27:53 INFO yarn.Client: Uploading file:/mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar to hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/rickshaw-spark-0.0.1-SNAPSHOT.jar 14/07/18 22:27:57 INFO yarn.Client: Uploading file:/home/hadoop/spark/lib/spark-assembly-1.0.0-hadoop2.4.0.jar to hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/spark-assembly-1.0.0-hadoop2.4.0.jar 14/07/18 22:27:59 INFO yarn.Client: Setting up the launch environment 14/07/18 22:27:59 INFO yarn.Client: Setting up container launch context 14/07/18 22:27:59 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx2048m, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, com.evocalize.rickshaw.spark.applications.GenerateAssetContent, --jar , /mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar, --args 'yarn-standalone' , --executor-memory, 2048, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 14/07/18 22:27:59 INFO yarn.Client: Submitting application to ASM 14/07/18 22:27:59 INFO impl.YarnClientImpl: Submitted application application_1405713259773_0014 ... 14/07/18 22:28:23 INFO yarn.Client: Application report from ASM: application identifier: application_1405713259773_0014 appId: 14 clientToAMToken: null appDiagnostics: Application application_1405713259773_0014 failed 2 times due to AM Container for appattempt_1405713259773_0014_02 exited with exitCode: -1000 due to: File does not exist: hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/spark-assembly-1.0.0-hadoop2.4.0.jar .Failing this attempt.. Failing the application. appMasterHost: N/A appQueue: default appMasterRpcPort: -1 appStartTime: 1405722479547 yarnAppState: FAILED distributedFinalState: FAILED appTrackingUrl: ip-172-16-2-167.us-west-1.compute.internal:9026/cluster/app/application_1405713259773_0014 appUser: hadoop - I tried to ls the file on the location - it doesn't exist either - although Spark could have cleaned that up before exiting. I verified that on EMR, all ports are open between each other so this can't be a port issue. What am I missing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-YARN-on-AWS-EMR-Issues-finding-file-on-hdfs-tp10214.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Hive From Spark
Hi Cheng Hao, Thank you very much for your reply. Basically, the program runs on Spark 1.0.0 and Hive 0.12.0 . Some setups of the environment are done by running SPARK_HIVE=true sbt/sbt assembly/assembly, including the jar in all the workers, and copying the hive-site.xml to spark's conf dir. And then run the program as: ./bin/run-example org.apache.spark.examples.sql.hive.HiveFromSpark It's good to know that this example runs well on your machine, could you please give me some insight about your have done as well? Thank you very much! Jiajia -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-tp10110p10215.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jar changed on src filesystem
Andrew, Yes, this works after cleaning up the .staging as you suggested. Thanks a lot! Jian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10216.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
BUG in spark-ec2 script (--ebs-vol-size) and workaround...
Hello all, There is a bug in the spark-ec2 script (perhaps due to a change in the Amazon AMI). The --ebs-vol-size option directs the spark-ec2 script to add an EBS volume of the specified size, and mount it at /vol for a persistent HDFS. To do this, it uses mkfs.xfs which is not available (though mkfs is). To work around this, I was able to run yum install xfsprogs on the master and each slave, and then use the --resume option with the script, and the persistent HDFS actually worked! This has been a frustrating experience, but I've used the spark-ec2 script for several months now, and it's incredibly helpful. I hope this post helps towards fixing the problem! Thanks, -Ben P.S. This is the full initial command I used, in case this is isolated to particular instance types or anything: ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge --ebs-vol-size=250 -m r3.2xlarge launch ... P.P.S. Ganglia is still broken, and has been for a while... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BUG-in-spark-ec2-script-ebs-vol-size-and-workaround-tp10217.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with long batch / window duration
If you want to process data that spans across weeks, then it best to use a dedicated data store (file system, sql / nosql database, etc.) that is designed for long term data storage and retrieval. Spark Streaming is not designed as a long term data store. Also it does not seem like you need low latency. So it might be better to use a combination of Spark Streaming and Spark programs - Spark Streaming to receive data and store it some long term data store, and Spark to periodically (every hour, day?) pull the data from the store and process them. You can implement the invertible function yourself in Spark by storing the previous reduced values in the same data store every time the spark program is run, and then using that data the next time. The great thing is that both these program can share all the map, and reduce functions. TD On Fri, Jul 18, 2014 at 12:09 PM, aaronjosephs aa...@placeiq.com wrote: Would it be a reasonable use case of spark streaming to have a very large window size (lets say on the scale of weeks). In this particular case the reduce function would be invertible so that would aid in efficiency. I assume that having a larger batch size since the window is so large would also lighten the workload for spark. The sliding duration is not too important, I just want to know if this is reasonable for spark to handle with any slide duration -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with long batch / window duration
Unfortunately for reasons I won't go into my options for what I can use are limited, it was more of a curiosity to see if spark could handle a use case like this since the functionality I wanted fit perfectly into the reduceByKeyAndWindow frame of thinking. Anyway thanks for answering. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191p10219.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Java null pointer exception while saving hadoop file
Hi I am getting null pointer exception while saving the data into hadoop. code as follows. If I change the last line to sorted_tup.take(count.toInt).foreach { case ((a, b, c), l) = sc.parallelize(l.toSeq).coalesce(1).saveAsTextFile(hdfsDir + a + / + b + / + c)} . I am able to save it , But for larger files I am getting heap space error . I am thinking it is due to take . Can some please help me with this. Thanks, Durga import org.apache.spark.SparkContext._ val conf = new SparkConf() .setMaster(master) .setAppName(appName) .set(spark.cores.max, numCores) .setJars(Seq(/home/hadoopuser/testing/Rest_1/FileSplitter/target/scala-2.10/filesplitter_2.10-1.0.jar)).set(spark.executor.memory, 5g) val sc = new SparkContext(conf) val action_results = sc.textFile(inputData) import scala.util.parsing.json.JSON val actions = action_results.map(l = JSON.parseFull(l).get).cache() val tuples = actions.map { l = var m = l.asInstanceOf[Map[Any, Any]]; ((m(deviceId).asInstanceOf[Map[Any, Any]]($numberLong).asInstanceOf[String], m(actionName).asInstanceOf[String], m(timestamp).asInstanceOf[Map[Any, Any]]($date).asInstanceOf[String].substring(0, 10)), l) } val tup_grp = tuples.groupByKey val tup_counts = tup_grp.map { case ((d: String, a: String, t: String), g) = ((d, a, t), g.toArray)} val sorted_tup = tup_counts.sortByKey(true) //val count = sorted_tup.count //println(Sorted Tuples: + sorted_tup.count) sorted_tup.foreach{case ((a, b, c), l:Array[Any]) = val lines = sc.parallelize(l.toSeq) lines.coalesce(2,true).saveAsTextFile(hdfsDir + a + / + b + / + c) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-null-pointer-exception-while-saving-hadoop-file-tp10220.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: BUG in spark-ec2 script (--ebs-vol-size) and workaround...
Thanks a lot for reporting this. I think we just missed installing xfsprogs on the AMI. I have a fix for this at https://github.com/mesos/spark-ec2/pull/59. After the pull request is merged, any new clusters launched should have mkfs.xfs Thanks Shivaram On Fri, Jul 18, 2014 at 4:56 PM, Ben Horner ben.hor...@atigeo.com wrote: Hello all, There is a bug in the spark-ec2 script (perhaps due to a change in the Amazon AMI). The --ebs-vol-size option directs the spark-ec2 script to add an EBS volume of the specified size, and mount it at /vol for a persistent HDFS. To do this, it uses mkfs.xfs which is not available (though mkfs is). To work around this, I was able to run yum install xfsprogs on the master and each slave, and then use the --resume option with the script, and the persistent HDFS actually worked! This has been a frustrating experience, but I've used the spark-ec2 script for several months now, and it's incredibly helpful. I hope this post helps towards fixing the problem! Thanks, -Ben P.S. This is the full initial command I used, in case this is isolated to particular instance types or anything: ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge --ebs-vol-size=250 -m r3.2xlarge launch ... P.P.S. Ganglia is still broken, and has been for a while... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BUG-in-spark-ec2-script-ebs-vol-size-and-workaround-tp10217.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: unserializable object in Spark Streaming context
I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
SparkSQL operator priority
Hello What is the order with which SparkSQL deserializes parquet fields? Is it possible to modify it? I am using SparkSQL to query a parquet file that consists of a lot of fields (around 30 or so). Let me call an example table MyTable and let's suppose the name of one of its fields is position. The query that I am executing is: sql(select * from MyTable where position = 243189160) The query plan that I get from this query is: Filter (position#6L:6 = 243189160) ParquetTableScan [contig.contigName#0,contig.contigLength#1L,contig.contigMD5#2,contig.referenceURL#3,contig.assembly#4,contig.species#5,position#6L,rangeOffset#7,rangeLength#8,referenceBase#9,readBase#10,sangerQuality#11,mapQuality#12,numSoftClipped#13,numReverseStrand#14,countAtPosition#15,readName#16,readStart#17L,readEnd#18L,recordGroupSequencingCenter#19,recordGroupDescription#20,recordGroupRunDateEpoch#21L,recordGroupFlowOrder#22,recordGroupKeySequence#23,recordGroupLibrary#24,recordGroupPredictedMedianInsertSize#25,recordGroupPlatform#26,recordGroupPlatformUnit#27,recordGroupSample#28], (ParquetRelation hdfs:// ec2-54-89-87-167.compute-1.amazonaws.com:9000/genomes/hg00096.plup), None I expect 14 entries in the output but the execution of .collect.foreach(println) takes forever to run on my cluster (more than an hour). Is it safe to assume in my example that SparkSQL deserializes all fields first before applying the filter? If so, can a user change this behavior? To support my assumption I replaced * with position, so my new query is of the form sql(select position from MyTable where position = 243189160) and this query runs much faster on the same hardware (2-3 minutes vs 65 min). Any ideas? thanks Christos
Re: unserializable object in Spark Streaming context
Thats, a good question. My first reach is timeout. Timing out after 10s of seconds should be sufficient. So there should be a timer in the singleton that runs a check every second, on when the singleton was last used, and closes the connections after a time out. Any attempts to use the connection again will create a new connection. TD On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote: I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: Graphx : Perfomance comparison over cluster
Thanks for your interest. I should point out that the numbers in the arXiv paper are from GraphX running on top of a custom version of Spark with an experimental in-memory shuffle prototype. As a result, if you benchmark GraphX at the current master, it's expected that it will be 2-3x slower than GraphLab. The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has changed a lot since then, and the way to configure and invoke Spark is different. I can send you the correct configuration/invocation for this if you're interested in benchmarking it. On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? You should use the built-in PageRank. If your graph is available in edge list format, you can run it using the Analytics driver as follows: ~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class org.apache.spark.graphx.lib.Analytics ~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS [--partStrategy=$PARTITION_STRATEGY] What should be the executor_memory, i.e. maximum or according to graph size? As much memory as possible while leaving room for the operating system. Is there any other configuration I should do to have the best performance? I think the parameters to Analytics above should be sufficient: - numEPart - should be equal to or a small integer multiple of the number of cores. More partitions improve work balance but also increase memory usage and communication, so in some cases it can even be faster with fewer partitions than cores. - partStrategy - If your edges are already sorted, you can skip this option, because GraphX will leave them as-is by default and that may be close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both worth trying. CC'ing Joey and Dan, who may have other suggestions. Ankur http://www.ankurdave.com/ On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: Hi, I am trying to compare Graphx and other distributed graph processing systems (graphlab) on my cluster of 64 nodes, each node having 32 cores and connected with infinite band. I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over there. I had few questions regarding configuration and achieving best performance. * Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? - If I shouldn't use the inbuilt pagerank, can you share your pagerank application? * What should be the executor_memory, i.e. maximum or according to graph size? * Other than, number of cores, executor_memory and partition strategy, Is there any other configuration I should do to have the best performance? I am using following script, import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val startgraphloading = System.currentTimeMillis; val graph = GraphLoader.edgeListFile(sc, filepath,true,32) val endgraphloading = System.currentTimeMillis; Thanks in advance :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: unserializable object in Spark Streaming context
Actually, let me clarify further. There are number of possibilities. 1. The easier, less efficient way is to create a connection object every time you do foreachPartition (as shown in the pseudocode earlier in the thread). For each partition, you create a connection, use it to push a all the records in the partition, and then close it. You dont even need to create a singleton in that case. The cons of this method is - You are not reusing connection across tasks and jobs. So you will be creating a lot of connections to the database, which may or may not be fine. - It will get worse if you partitions are tiny and pushing each partition takes few 100ms or few seconds (as possible with Spark Streaming). 2. The slightly harder, but more efficient way would be to use singletons, which can contain one connection, or maintain a connection pool. Then connections in the pool are created on demand, but not explicitly closed at the end of the task, and are reused across tasks and jobs. In that case, closing the connection would require some kind of timeout mechanism as I explained in the previous post. Care also need to be taken if these connections are threadsafe or not. Hope this helps! TD On Fri, Jul 18, 2014 at 8:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats, a good question. My first reach is timeout. Timing out after 10s of seconds should be sufficient. So there should be a timer in the singleton that runs a check every second, on when the singleton was last used, and closes the connections after a time out. Any attempts to use the connection again will create a new connection. TD On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote: I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: Graphx : Perfomance comparison over cluster
Thanks a lot Ankur. The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has changed a lot since then, and the way to configure and invoke Spark is different. I can send you the correct configuration/invocation for this if you're interested in benchmarking it. Actually I wanted to see how graphlab and graphx performs for the cluster we have (32 cores per node and infinite band). I tried the live journal graph with partitions = 400 (16 nodes and each node with 32 cores). but it performed better with partition=64. I'll try it again. Does the suggested version with in-memory shuffle affects performance too much? (according to previously reported numbers, graphx did 10 iterations in 142 seconds and in latest stats it does it in 68 seconds). Is it just the in-memory version which is changed? On Fri, Jul 18, 2014 at 8:31 PM, ankurdave [via Apache Spark User List] ml-node+s1001560n10227...@n3.nabble.com wrote: Thanks for your interest. I should point out that the numbers in the arXiv paper are from GraphX running on top of a custom version of Spark with an experimental in-memory shuffle prototype. As a result, if you benchmark GraphX at the current master, it's expected that it will be 2-3x slower than GraphLab. The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has changed a lot since then, and the way to configure and invoke Spark is different. I can send you the correct configuration/invocation for this if you're interested in benchmarking it. On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB [hidden email] http://user/SendEmail.jtp?type=nodenode=10227i=0 wrote: Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? You should use the built-in PageRank. If your graph is available in edge list format, you can run it using the Analytics driver as follows: ~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class org.apache.spark.graphx.lib.Analytics ~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS [--partStrategy=$PARTITION_STRATEGY] What should be the executor_memory, i.e. maximum or according to graph size? As much memory as possible while leaving room for the operating system. Is there any other configuration I should do to have the best performance? I think the parameters to Analytics above should be sufficient: - numEPart - should be equal to or a small integer multiple of the number of cores. More partitions improve work balance but also increase memory usage and communication, so in some cases it can even be faster with fewer partitions than cores. - partStrategy - If your edges are already sorted, you can skip this option, because GraphX will leave them as-is by default and that may be close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both worth trying. CC'ing Joey and Dan, who may have other suggestions. Ankur http://www.ankurdave.com/ On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB [hidden email] http://user/SendEmail.jtp?type=nodenode=10227i=1 wrote: Hi, I am trying to compare Graphx and other distributed graph processing systems (graphlab) on my cluster of 64 nodes, each node having 32 cores and connected with infinite band. I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over there. I had few questions regarding configuration and achieving best performance. * Should I use the pagerank application already available in graphx for this purpose or need to modify or need to write my own? - If I shouldn't use the inbuilt pagerank, can you share your pagerank application? * What should be the executor_memory, i.e. maximum or according to graph size? * Other than, number of cores, executor_memory and partition strategy, Is there any other configuration I should do to have the best performance? I am using following script, import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val startgraphloading = System.currentTimeMillis; val graph = GraphLoader.edgeListFile(sc, filepath,true,32) val endgraphloading = System.currentTimeMillis; Thanks in advance :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222p10227.html To unsubscribe from Graphx : Perfomance comparison over cluster, click here
SeattleSparkMeetup: Spark at eBay - Troubleshooting the everyday issues
We're coming off a great Seattle Spark Meetup session with Evan Chan (@evanfchan) Interactive OLAP Queries with @ApacheSpark and #Cassandra (http://www.slideshare.net/EvanChan2/2014-07olapcassspark) at Whitepages. Now, we're proud to announce that our next session is Spark at eBay - Troubleshooting the Everyday Issues (http://meetu.ps/2dcsXs) at the Expedia Building on August 8th. Come join us!