Re: heterogeneous cluster setup
To reiterate, it's very important for Spark's workers to have the same memory available. Think about Spark uniformly chopping up your data and distributing the work to the nodes. The algorithm is not designed to consider that a worker has less memory available than some other worker. On Thu, Dec 4, 2014 at 12:11 AM, rapelly kartheek kartheek.m...@gmail.com wrote: *It's very important for Spark's workers to have the same resources available* So, each worker should have same amount of memory and same number of cores. But, heterogeneity of the cluster in the physical layout of cpu is understandable, but how about heterogeneity with respect to memory? On Thu, Dec 4, 2014 at 12:18 PM, Victor Tso-Guillen v...@paxata.com wrote: You'll have to decide which is more expensive in your heterogenous environment and optimize for the utilization of that. For example, you may decide that memory is the only costing factor and you can discount the number of cores. Then you could have 8GB on each worker each with four cores. Note that cores in Spark don't necessarily map to cores on the machine. It's just a configuration setting for how many simultaneous tasks that worker can work on. You are right that each executor gets the same amount of resources and I would add level of parallelization. Your heterogeneity is in the physical layout of your cluster, not in how Spark treats the workers as resources. It's very important for Spark's workers to have the same resources available because it needs to be able to generically divide and conquer your data amongst all those workers. Hope that helps, Victor On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you so much for valuable reply, Victor. That's a very clear solution I understood. Right now I have nodes with: 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my understanding, the division could be something like, each executor can have 2 cores and 6GB RAM. So, the ones with 16GB RAM and 4 cores can have two executors. Please let me know if my understanding is correct. But, I am not able to see any heterogeneity in this setting as each executor has got the same amount of resources. Can you please clarify this doubt? Regards Karthik On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com wrote: I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
Re: Any ideas why a few tasks would stall
I ran into something similar before. 19/20 partitions would complete very quickly, and 1 would take the bulk of time and shuffle reads writes. This was because the majority of partitions were empty, and 1 had all the data. Perhaps something similar is going on here - I would suggest taking a look at how much data each partition contains and try to achieve a roughly even distribution for best performance. In particular, if the RDDs are PairRDDs, partitions are assigned based on the hash of the key, so an even distribution of values among keys is required for even split of data across partitions. On December 2, 2014 at 4:15:25 PM, Steve Lewis (lordjoe2...@gmail.com) wrote: 1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD NameStorage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 Memory Deserialized 1x Replicated 1 100%11.8 MB 0.0 B 0.0 B 14 Memory Deserialized 1x Replicated 1 100%122.7 MB 0.0 B 0.0 B 7 Memory Deserialized 1x Replicated 120 100%151.1 MB 0.0 B 0.0 B 1 Memory Deserialized 1x Replicated 1 100%65.6 MB 0.0 B 0.0 B 10 Memory Deserialized 1x Replicated 24 100%160.6 MB 0.0 B 0.0 B 27 Memory Deserialized 1x Replicated 116 97% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Serializing with Kryo NullPointerException - Java
Using dependency groupIdcom.esotericsoftware/groupId artifactIdkryo-shaded/artifactId version3.0.0/version /dependency Instead of dependency groupIdcom.esotericsoftware.kryo/groupId artifactIdkryo/artifactId version2.24.0/version /dependency fixed this On 2014-12-03 18:15, Robin Keunen wrote: Hi all, I am having troubles using Kryo and being new to this kind of serialization, I am not sure where to look. Can someone please help me? :-) Here is my custom class: public class *DummyClass* implements KryoSerializable { private static final Logger LOGGER = LoggerFactory.getLogger(DummyClass.class); int value; public DummyClass() { } public DummyClass(int value) { LOGGER.info(hey I'm dum {}!, value); this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public void write(Kryo kryo, Output output) { output.writeInt(value); } @Override public void read(Kryo kryo, Input input) { this.value = input.readInt(); } } Here is my registrator: public class MyKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(DummyClass.class); } } And the *Spark* code: SparkConf sparkConf = new SparkConf() .setAppName(appName) .setMaster(master) .setJars(jars) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, org.roke.main.MyKryoRegistrator); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); ListDummyClass dummyClasses = Arrays.asList( new DummyClass(1), new DummyClass(2), new DummyClass(3), new DummyClass(4) ); JavaRDDDummyClass rdd = sparkContext.parallelize(dummyClasses); for (DummyClass dummyClass: rdd.collect()) LOGGER.info(driver collected {}, dummyClass); The program fails with the following NullPointerException: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68): java.lang.NullPointerException: com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36) com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be
Re: Any ideas why a few tasks would stall
Good point, Ankit. Steve - You can click on the link for '27' in the first column to get a break down of how much data is in each of those 116 cached partitions. But really, you want to also understand how much data is in the 4 non-cached partitions, as they may be huge. One thing you can try doing is .repartition() on the RDD with something like 100 partitions and then cache this new RDD. See if that spreads the load between the partitions more evenly. Let us know how it goes. On Thu, Dec 4, 2014 at 12:16 AM, Ankit Soni ankitso...@gmail.com wrote: I ran into something similar before. 19/20 partitions would complete very quickly, and 1 would take the bulk of time and shuffle reads writes. This was because the majority of partitions were empty, and 1 had all the data. Perhaps something similar is going on here - I would suggest taking a look at how much data each partition contains and try to achieve a roughly even distribution for best performance. In particular, if the RDDs are PairRDDs, partitions are assigned based on the hash of the key, so an even distribution of values among keys is required for even split of data across partitions. On December 2, 2014 at 4:15:25 PM, Steve Lewis (lordjoe2...@gmail.com) wrote: 1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2 Memory Deserialized 1x Replicated 1 100% 11.8 MB 0.0 B 0.0 B 14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14 Memory Deserialized 1x Replicated 1 100% 122.7 MB 0.0 B 0.0 B 7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7 Memory Deserialized 1x Replicated 120 100% 151.1 MB 0.0 B 0.0 B 1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1 Memory Deserialized 1x Replicated 1 100% 65.6 MB 0.0 B 0.0 B 10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10 Memory Deserialized 1x Replicated 24 100% 160.6 MB 0.0 B 0.0 B 27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27 Memory Deserialized 1x Replicated 116 97% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Necessity for rdd replication.
In general, most use cases don't need the RDD to be replicated in memory multiple times. It would be a rare exception to do this. If it's really expensive (time consuming) to recomputing a lost partition or if the use case is extremely time sensitive, then maybe you could replicate it in memory. But in general, you can safely rely on the RDD lineage graph to re-create the lost partition it it gets discarded from memory. As far as extracting better parallelism if the RDD is replicated, that really depends on what sort of transformations and operations you're running against the RDD, but again.. generally speaking, you shouldn't need to replicate it. On Wed, Dec 3, 2014 at 11:54 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I was just thinking about necessity for rdd replication. One category could be something like large number of threads requiring same rdd. Even though, a single rdd can be shared by multiple threads belonging to same application , I believe we can extract better parallelism if the rdd is replicated, am I right?. I am eager to know if there are any real life applications or any other scenarios which force rdd to be replicated. Can someone please throw some light on necessity for rdd replication. Thank you
Re: Monitoring Spark
Are you running Spark in Local or Standalone mode? In either mode, you should be able to hit port 4040 (to see the Spark Jobs/Stages/Storage/Executors UI) on the machine where the driver is running. However, in local mode, you won't have a Spark Master UI on 7080 or a Worker UI on 7081. You can manually set the Spark Stages UI port to something other than 4040 (in case there are conflicts) with the spark.ui.port setting. Also, after setting the evengLog.enabled to true, you may also want to specificy the spark.eventLog.dir to a globally visible filesystem like HDFS (unless you're running in local mode). On Wed, Dec 3, 2014 at 10:01 AM, Isca Harmatz pop1...@gmail.com wrote: hello, im running spark on stand alone station and im try to view the event log after the run is finished i turned on the event log as the site said (spark.eventLog.enabled set to true) but i can't find the log files or get the web ui to work. any idea on how to do this? thanks Isca
[no subject]
- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map function
Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: [question]Where can I get the log file
Hi, You can access your logs in your /spark_home_directory/logs/ directory . cat the file names and you will get the logs. Thanks. On Thu, Dec 4, 2014 at 2:27 PM, FFeng [via Apache Spark User List] ml-node+s1001560n20344...@n3.nabble.com wrote: I have wrote data to spark log. I get it through the web interface, but I really want to know if I can get these log file on my node. Where are they? Thx. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/question-Where-can-I-get-the-log-file-tp20344.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/question-Where-can-I-get-the-log-file-tp20344p20347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLLIB model export: PMML vs MLLIB serialization
Hi Sourabh, I came across same problem as you. One workable solution for me was to serialize the parts of model that can be used again to recreate it. I serialize RDD's in my model using saveAsObjectFile with a time stamp attached to it in HDFS. My other spark application read from the latest stored dir from HDFS using sc.ObjectFile and recreate the recently trained model for prediction. I think this is not the best solution but it worked for me. I am also looking for other efficient approaches for such problem where exporting of model to some other application is required. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem creating EC2 cluster using spark-ec2
Fantastic, thanks for the quick fix! On 3 December 2014 at 22:11, Andrew Or and...@databricks.com wrote: This should be fixed now. Thanks for bringing this to our attention. 2014-12-03 13:31 GMT-08:00 Andrew Or and...@databricks.com: Yeah this is currently broken for 1.1.1. I will submit a fix later today. 2014-12-02 17:17 GMT-08:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu: +Andrew Actually I think this is because we haven't uploaded the Spark binaries to cloudfront / pushed the change to mesos/spark-ec2. Andrew, can you take care of this ? On Tue, Dec 2, 2014 at 5:11 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Interesting. Do you have any problems when launching in us-east-1? What is the full output of spark-ec2 when launching a cluster? (Post it to a gist if it’s too big for email.) On Mon, Dec 1, 2014 at 10:34 AM, Dave Challis dave.chal...@aistemos.com wrote: I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SchemaRDD partition on specific column values?
Hi All, I want to hash partition (and then cache) a schema RDD in way that partitions are based on hash of the values of a column (ID column in my case). e.g. if my table has ID column with values as 1,2,3,4,5,6,7,8,9 and spark.sql.shuffle.partitions is configured as 3, then there should be 3 partitions and say for ID=1, all the tuples should be present in one particular partition. My actual use case is that I always get a query in which I have to join 2 cached tables on ID column, so it first partitions both tables on ID and then apply JOIN and I want to avoid the partitioning based on ID by preprocessing it (and then cache it). Thanks in Advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Determination of number of RDDs
Hi, I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? Like in C language we have array of pointers. Do we have array of RDDs in Spark. Can we create such an array and then parallelize it? Thank You
Re: Determination of number of RDDs
At 2014-12-04 02:08:45 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? This is possible: you can collect the elements to the driver, then create an RDD for each element. If you have so many elements that collect them to the driver is infeasible, there's probably an alternative solution that doesn't involve creating one RDD per element. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib: loading saved model
Hi Sameer, Your model recreation should be: val model = new LinearRegressionModel(weights, intercept) As you have already got weights for linear regression model using stochastic gradient descent, you just have to use LinearRegressionModel to construct new model. Other points to notice is that weights should be in vector format so you have to convert weights to vector after reading from file and your intercept will be 0.0 as you mentioned. Regards, Manish -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-loading-saved-model-tp20281p20354.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX Pregel halting condition
There's no built-in support for doing this, so the best option is to copy and modify Pregel to check the accumulator at the end of each iteration. This is robust and shouldn't be too hard, since the Pregel code is short and only uses public GraphX APIs. Ankur At 2014-12-03 09:37:01 -0800, Jay Hutfles jayhutf...@gmail.com wrote: I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
R: map function
Hi, rdd.flatMap( e = e._2.map( i = ( i, e._1))) Should work, but I didn't test it so maybe I'm missing something. Paolo Inviata dal mio Windows Phone Da: Yifan LImailto:iamyifa...@gmail.com Inviato: 04/12/2014 09:27 A: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: map function Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Example usage of StreamingListener
Hi! does anybody has some useful example of StreamingListener interface. When and how can we use this interface to stop streaming when one batch of data is processed? Thanks alot -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-usage-of-StreamingListener-tp20357.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: running Spark Streaming just once and stop it
Hi Kal El! Have you done stopping streaming after first iteration? if yes can you share example code. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/running-Spark-Streaming-just-once-and-stop-it-tp1382p20359.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib Naive Bayes classifier confidence
That was it, Thanks. (Posting here so people know it's the right answer in case they have the same need :) ). sowen wrote Probabilities won't sum to 1 since this expression doesn't incorporate the probability of the evidence, I imagine? it's constant across classes so is usually excluded. It would appear as a - log(P(evidence)) term. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-classifier-confidence-tp18456p20361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Example usage of StreamingListener
Thanks Akhil You are so helping Dear. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-usage-of-StreamingListener-tp20357p20362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map function
Thanks, Paolo and Mark. :) On 04 Dec 2014, at 11:58, Paolo Platter paolo.plat...@agilelab.it wrote: Hi, rdd.flatMap( e = e._2.map( i = ( i, e._1))) Should work, but I didn't test it so maybe I'm missing something. Paolo Inviata dal mio Windows Phone Da: Yifan LI mailto:iamyifa...@gmail.com Inviato: 04/12/2014 09:27 A: user@spark.apache.org mailto:user@spark.apache.org Oggetto: map function Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Hi Davies, Thanks for the reply The problem is I have empty dictionaries in my field3 as well. It gives me an error : Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1042, in inferSchema schema = _infer_schema(first) File /root/spark/python/pyspark/sql.py, line 495, in _infer_schema fields = [StructField(k, _infer_type(v), True) for k, v in items] File /root/spark/python/pyspark/sql.py, line 460, in _infer_type raise ValueError(Can not infer type for empty dict) ValueError: Can not infer type for empty dict When I remove the empty dictionary items from each record. That is, when mapping to the main dictionary, if field3 is an empty ditc, i do not include that hence the record converts from { field1:5, field2: 'string', field3: {} } to { field1:5, field2: 'string', } At this point, I get : ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1044, in inferSchema return self.applySchema(rdd, schema) File /root/spark/python/pyspark/sql.py, line 1117, in applySchema rows = rdd.take(10) File /root/spark/python/pyspark/rdd.py, line 1153, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /root/spark/python/pyspark/context.py, line 770, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 22628, ip-172-31-30-89.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /root/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /root/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /root/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /root/spark/python/pyspark/rdd.py, line 1148, in takeUpToNumLeft yield next(iterator) File /root/spark/python/pyspark/sql.py, line 552, in _drop_schema yield converter(i) File /root/spark/python/pyspark/sql.py, line 540, in nested_conv return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 540, in genexpr return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 508, in lambda return lambda row: dict((k, conv(v)) for k, v in row.iteritems()) AttributeError: 'int' object has no attribute 'iteritems' I am clueless what to do about this. Hope you can help :) Many thanks SahanB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does filter on an RDD scan every data item ?
Thanks for the reply! To be honest, I was expecting spark to have some sort of Indexing for keys, which would help it locate the keys efficiently. I wasn't using Spark SQL here, but if it helps perform this efficiently, i can try it out, can you please elaborate, how will it be helpful in this scenario ? Thanks, Nitin. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20365.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does filter on an RDD scan every data item ?
I'm not sure sample is what i was looking for. As mentioned in another post above. this is what i'm looking for. 1) My RDD contains this structure. Tuple2CustomTuple,Double. 2) Each CustomTuple is a combination of string id's e.g. CustomTuple.dimensionOne=AE232323 CustomTuple.dimensionTwo=BE232323 CustomTuple.dimensionThree=CE232323 and so on --- 3) CustomTuple has overridden equals hash implementation which helps identify unique objects and equality if values in dimensionOne,Two,Three match for two distinct objects. 4) Double is a numberic value. 5) I want to create RDD of 50-100Million or more such tuples in Spark, which can grow over time. 6) My Web Application would request to process a subset of these millions of rows. The processing is nothing but aggregation / arithmetic functions over this data set. We felt spark would be the right candidate to process this in distributed fashion and also would help scalability for future. Where we are stuck is that, in case the application requests a subset comprising of 100thousand tuples, we would have to construct these many CustomTuple objects and pass them via Spark Driver program to the filter function, which in turn would go and scan these 100 million rows to generate the subset. I was of the assumption, that since Spark allows Key / Value storage, there would be some indexing for the Keys stored, which would help spark locate objects. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20366.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming empty RDD issue
Hi, According to my knowledge of current Spark Streaming Kafka connector, I think there's no chance for APP user to detect such kind of failure, this will either be done by Kafka consumer with ZK coordinator, either by ReceiverTracker in Spark Streaming, so I think you don't need to take care of this issue from user's perspective. If there's no new message coming to consumer, the consumer will wait. Thanks Jerry -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: Thursday, December 4, 2014 2:47 PM To: u...@spark.incubator.apache.org Subject: Spark Streaming empty RDD issue Hi Experts I am using Spark Streaming to integrate Kafka for real time data processing. I am facing some issues related to Spark Streaming So I want to know how can we detect 1) Our connection has been lost 2) Our receiver is down 3) Spark Streaming has no new messages to consume. how can we deal these issues? I will be glad to hear from you and will be thankful to you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-empty-RDD-issue-tp20329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to Integrate openNLP with Spark
Did anyone get a chance to look at this? Please provide some help. Thanks Nikhil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117p20368.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Efficient way to get top K values per key in (key, value) RDD?
Hello everyone, I was wondering what is the most efficient way for retrieving the top K values per key in a (key, value) RDD. The simplest way I can think of is to do a groupByKey, sort the iterables and then take the top K elements for every key. But reduceByKey is an operation that can be very costly. This http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html thread seems related, where it is recommended to change the key include the value we want to sort on, and then perform an aggregate operation. My use case would be to filter an RDD representing the edges of a graph ( (srcID, dstID), edgeWeight), so that we only retain at most top K edges according to weight for each (srcID, dstID) key. The graph can have multiple edges between the same two vertices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Market Basket Analysis
Hello Folks: I'd like to do market basket analysis using spark, what're my options? Thanks, Rohit Pujari Solutions Architect, Hortonworks -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Efficient way to get top K values per key in (key, value) RDD?
You probably want to use combineByKey, and create an empty min queue for each key. Merge values into the queue if its size is K. If = K, only merge the value if it exceeds the smallest element; if so add it and remove the smallest element. This gives you an RDD of keys mapped to collections of up to K values each, and should be about as efficient as it gets in general. On Thu, Dec 4, 2014 at 8:53 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Hello everyone, I was wondering what is the most efficient way for retrieving the top K values per key in a (key, value) RDD. The simplest way I can think of is to do a groupByKey, sort the iterables and then take the top K elements for every key. But reduceByKey is an operation that can be very costly. This http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html thread seems related, where it is recommended to change the key include the value we want to sort on, and then perform an aggregate operation. My use case would be to filter an RDD representing the edges of a graph ( (srcID, dstID), edgeWeight), so that we only retain at most top K edges according to weight for each (srcID, dstID) key. The graph can have multiple edges between the same two vertices. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Failed to read chunk exception
I am running a large job using 4000 partitions - after running for four hours on a 16 node cluster it fails with the following message. The errors are in spark code and seem address unreliability at the level of the disk - Anyone seen this and know what is going on and how to fix it. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 15.827 failed 4 times, most recent failure: Lost task 13.3 in stage 15.827 (TID 13386, pltrd022.labs.uninett.no): java.io.IOException: failed to read chunk org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:348) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) .
Re: Spark-Streaming: output to cassandra
I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Any ideas why a few tasks would stall
Thanks - I found the same thing - calling boolean forceShuffle = true; myRDD = myRDD.coalesce(120,forceShuffle ); worked - there were 120 partitions but forcing a shuffle distributes the work I believe there is a bug in my code causing memory to accumulate as partitions grow in size. With a job ofer ten times larger I ran into other issues raising the number of partitions to 10,000 - namely too many open files On Thu, Dec 4, 2014 at 8:32 AM, Sameer Farooqui same...@databricks.com wrote: Good point, Ankit. Steve - You can click on the link for '27' in the first column to get a break down of how much data is in each of those 116 cached partitions. But really, you want to also understand how much data is in the 4 non-cached partitions, as they may be huge. One thing you can try doing is .repartition() on the RDD with something like 100 partitions and then cache this new RDD. See if that spreads the load between the partitions more evenly. Let us know how it goes. On Thu, Dec 4, 2014 at 12:16 AM, Ankit Soni ankitso...@gmail.com wrote: I ran into something similar before. 19/20 partitions would complete very quickly, and 1 would take the bulk of time and shuffle reads writes. This was because the majority of partitions were empty, and 1 had all the data. Perhaps something similar is going on here - I would suggest taking a look at how much data each partition contains and try to achieve a roughly even distribution for best performance. In particular, if the RDDs are PairRDDs, partitions are assigned based on the hash of the key, so an even distribution of values among keys is required for even split of data across partitions. On December 2, 2014 at 4:15:25 PM, Steve Lewis (lordjoe2...@gmail.com) wrote: 1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2 Memory Deserialized 1x Replicated 1 100% 11.8 MB 0.0 B 0.0 B 14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14 Memory Deserialized 1x Replicated 1 100% 122.7 MB 0.0 B 0.0 B 7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7 Memory Deserialized 1x Replicated 120 100% 151.1 MB 0.0 B 0.0 B 1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1 Memory Deserialized 1x Replicated 1 100% 65.6 MB 0.0 B 0.0 B 10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10 Memory Deserialized 1x Replicated 24 100% 160.6 MB 0.0 B 0.0 B 27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27 Memory Deserialized 1x Replicated 116 97% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How can a function get a TaskContext
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/TaskContext.java has a Java implementation if TaskContext wit a very useful method /** * Return the currently active TaskContext. This can be called inside of * user functions to access contextual information about running tasks. */ public static TaskContext get() { return taskContext.get(); } I would like to call this but my Spark 1.1 code seems to use a Scala Taskcontext lacking a get method How can one get a Task context and is which versions is get supported
Stateful mapPartitions
Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open?
Re: Spark SQL with a sorted file
I'll add that some of our data formats will actual infer this sort of useful information automatically. Both parquet and cached inmemory tables keep statistics on the min/max value for each column. When you have predicates over these sorted columns, partitions will be eliminated if they can't possibly match the predicate given the statistics. For parquet this is new in Spark 1.2 and it is turned off by defaults (due to bugs we are working with the parquet library team to fix). Hopefully soon it will be on by default. On Wed, Dec 3, 2014 at 8:44 PM, Cheng, Hao hao.ch...@intel.com wrote: You can try to write your own Relation with filter push down or use the ParquetRelation2 for workaround. ( https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ) Cheng Hao -Original Message- From: Jerry Raj [mailto:jerry@gmail.com] Sent: Thursday, December 4, 2014 11:34 AM To: user@spark.apache.org Subject: Spark SQL with a sorted file Hi, If I create a SchemaRDD from a file that I know is sorted on a certain field, is it possible to somehow pass that information on to Spark SQL so that SQL queries referencing that field are optimized? Thanks -Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Which version of Spark are you using? inferSchema() is improved to support empty dict in 1.2+, could you try the 1.2-RC1? Also, you can use applySchema(): from pyspark.sql import * fields = [StructField('field1', IntegerType(), True), StructField('field2', StringType(), True), StructField('field3', MapType(StringType(), IntegerType(), True))] schema = StructType(fields) rdd2 = rdd.map(lambda x: (x['field1'], x['field2'], x['field3'])) sqlContext.applySchema(rdd2, schema) PS: the above code is not testsed. Davies On Thu, Dec 4, 2014 at 4:22 AM, sahanbull sa...@skimlinks.com wrote: Hi Davies, Thanks for the reply The problem is I have empty dictionaries in my field3 as well. It gives me an error : Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1042, in inferSchema schema = _infer_schema(first) File /root/spark/python/pyspark/sql.py, line 495, in _infer_schema fields = [StructField(k, _infer_type(v), True) for k, v in items] File /root/spark/python/pyspark/sql.py, line 460, in _infer_type raise ValueError(Can not infer type for empty dict) ValueError: Can not infer type for empty dict When I remove the empty dictionary items from each record. That is, when mapping to the main dictionary, if field3 is an empty ditc, i do not include that hence the record converts from { field1:5, field2: 'string', field3: {} } to { field1:5, field2: 'string', } At this point, I get : ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1044, in inferSchema return self.applySchema(rdd, schema) File /root/spark/python/pyspark/sql.py, line 1117, in applySchema rows = rdd.take(10) File /root/spark/python/pyspark/rdd.py, line 1153, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /root/spark/python/pyspark/context.py, line 770, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 22628, ip-172-31-30-89.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /root/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /root/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /root/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /root/spark/python/pyspark/rdd.py, line 1148, in takeUpToNumLeft yield next(iterator) File /root/spark/python/pyspark/sql.py, line 552, in _drop_schema yield converter(i) File /root/spark/python/pyspark/sql.py, line 540, in nested_conv return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 540, in genexpr return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 508, in lambda return lambda row: dict((k, conv(v)) for k, v in row.iteritems()) AttributeError: 'int' object has no attribute 'iteritems' I am clueless what to do about this. Hope you can help :) Many thanks SahanB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
I used the command below because I'm using Spark 1.0.2 built with SBT and it worked. SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true SPARK_GANGLIA_LGPL=true sbt/sbt assembly -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p20384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Determination of number of RDDs
Regarding: Can we create such an array and then parallelize it? Parallelizing an array of RDDs - i.e. RDD[RDD[x]] is not possible. RDD is not serializable. From: Deep Pradhan [mailto:pradhandeep1...@gmail.com] Sent: 04 December 2014 15:39 To: user@spark.apache.org Subject: Determination of number of RDDs Hi, I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? Like in C language we have array of pointers. Do we have array of RDDs in Spark. Can we create such an array and then parallelize it? Thank You
Re: Spark metrics for ganglia
Hello Samudrala, Did you solve this issue about view metrics in Ganglia?? Because I have the same problem. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-metrics-for-ganglia-tp14335p20385.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any ideas why a few tasks would stall
This did not work for me. that is, rdd.coalesce(200, forceShuffle) . Does anyone have ideas on how to distribute your data evenly and co-locate partitions of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-ideas-why-a-few-tasks-would-stall-tp20207p20387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL query in scala API
Is that Spark SQL? I'm wondering if it's possible without spark SQL. On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com wrote: You may do this: table(users).groupBy('zip)('zip, count('user), countDistinct('user)) On 12/4/14 8:47 AM, Arun Luthra wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
Re: Spark SQL table Join, one task is taking long
Hi Cheng, Thank you very much for taking your time and providing a detailed explanation. I tried a few things you suggested and some more things. The ContactDetail table (8 GB) is the fact table and DAgents is the Dim table (500 KB), reverse of what you are assuming, but your ideas still apply. I tried the following: a) Cached the smaller Dim table to memory. sqlContext.setConf(spark.sql.autoBroadcastJoinShreshold, 1000) sqlContext.cacheTable(DAgents) UI - Stage - Storage shows it to be cached in RDD when I run it. val CDJoinQry= sqlContext.sql(SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902) CDJoinQry.map(ta = ta(4)).count I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min b) I reversed both the order of tables and where clause in the query val CDJoinQry= sqlContext.sql(SELECT * FROM DAgents, ContactDetail WHERE DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6) The performance went bad. It took 6-7 min to complete. Just changing the order of table in Select for this join, keeping the same where clause order, perf was similar (1.2-1.4 min). c) Using query in a), I tried to keep the storage in columnar fashion with sqlContext.setConf(spark.sql.inMemoryColumnarStorage.compressed, true) I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min. Not sure if it even works. d) I tried changing the comma separated HDFS files to Parquet format in HDFS and reading it as parquet and then running query on it. DAgents.saveAsParquetFile(DAgents.parquet) FCDRDD.saveAsParquetFile(ContactDetail.parquet) val DAgentsParquetRDD = sqlContext.parquetFile(DAgents.parquet) DAgentsParquetRDD.registerAsTable(DAgentsParquet) val FContactDetailParquetRDD = sqlContext.parquetFile(ContactDetail.parquet) FContactDetailParquetRDD.registerAsTable(ContactDetailParquet) val CDJoinQryParquet= sqlContext.sql(SELECT * FROM ContactDetailParquet, DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and DAgentsParquet.f1 = 902) CDJoinQryParquet.map(ta = ta(4)).count *The query time is actually more for this join query.* It ended up taking 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse than non parquet for this join. I changed the query where table order and where clause was reversed and ran it for parquet val CDJoinQryParquetReversed= sqlContext.sql(SELECT * FROM DAgentsParquet, ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and DAgentsParquet.f1=ContactDetailParquet.f6 ) CDJoinQryParquetReversed.map(ta = ta(4)).count it took 18 min and had to kill it as it kept on running. *But queries where there is no join, Parquet's performance was extremely good.* For example, this query below where there is no join, ran in 8 seconds, whereas the same query in non parquet took 30 seconds. val CDJoinQryParquet0= sqlContext.sql(SELECT * FROM ContactDetailParquet WHERE ContactDetailParquet.f6 = 902) CDJoinQryParquet0.map(ta = ta(4)).count *Some potential conclusions (pl. comment) :* * Order in where clause seems to matter in Spark SQL optimizer. In relational DBs that I have worked with, when I noticed, order of where clause is typically a hint . Would be nice of Spark SQL optimizer is fixed to ignore order of clauses and optimize it automatically. * I tried changing just the table order in Select statement for a join and it also seems to matter when reading data from HDFS (for parquet and to a less extent for non parquet in my case) even when the where clause order is same. Would be nice of SQL optimizer optimizes it automatically. * Table joins for huge table(s) are costly. Fact and Dimension concepts from star schema don't translate well to Big Data (Hadoop, Spark). It may be better to de-normalize and store huge tables to avoid Joins. Joins seem to be evil. (Have tried de-normalizing when using Cassandra, but that has its own problem of resulting in full table scan when running ad-hoc queries when the keys are not known) Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-ec2 Web UI Problem
Hi Guys: I have succsefully installed apache-spark on Amazon ec2 using spark-ec2 command and I could login to the master node. Here is the installation message: RSYNC'ing /etc/ganglia to slaves... ec2-54-148-197-89.us-west-2.compute.amazonaws.com Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Connection to ec2-54-148-197-89.us-west-2.compute.amazonaws.com closed. Shutting down GANGLIA gmetad: [FAILED] Starting GANGLIA gmetad: [ OK ] Stopping httpd:[FAILED] Starting httpd:[ OK ] Connection to ec2-54-148-248-162.us-west-2.compute.amazonaws.com closed. Spark standalone cluster started at http://ec2-54-148-248-162.us-west-2.compute.amazonaws.com:8080 Ganglia started at http://ec2-54-148-248-162.us-west-2.compute.amazonaws.com:5080/ganglia However, I could not open the web UI. I have checked the security group, which allows the connection to the port 8080. I could not figure out how to solve it. Any sueggestion is appreciated. Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
Unable to run applications on clusters on EC2
I think it is related to my previous questions, but I separate them. In my previous question, I could not connect to WebUI even though I could log into the cluster without any problem. Also, I tried lynx localhost:8080 and I could get the information about the cluster; I could also user spark-submit to submit job locally by setting master to localhost However, I could not submit the job to the cluster master and I get the error like this: 14/12/04 22:14:39 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/12/04 22:14:42 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070... 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] Please let me know if you any any clue about it. Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
How to make symbol for one column in Spark SQL.
I have tried to use function where and filter in SchemaRDD. I have build class for tuple/record in the table like this: case class Region(num:Int, str1:String, str2:String) I also successfully create a SchemaRDD. scala val results = sqlContext.sql(select * from region) results: org.apache.spark.sql.SchemaRDD = SchemaRDD[99] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == ExistingRdd [num#0,str1#1,str2#2], MapPartitionsRDD[4] at mapPartitions at BasicOperators.scala:208 But I cannot use symbol in where and filter function. Here is the log: scala results.where('num === 1) console:22: error: value === is not a member of Symbol results.where('num === 1) ^ I don't know why. Any suggestions? Thanks, Tim
How to extend an one-to-one RDD of Spark that can be persisted?
In my project I extend a new RDD type that wraps another RDD and some metadata. The code I use is similar to FilteredRDD implementation: case class PageRowRDD( self: RDD[PageRow], @transient keys: ListSet[KeyLike] = ListSet() ){ override def getPartitions: Array[Partition] = firstParent[PageRow].partitions override val partitioner = self.partitioner override def compute(split: Partition, context: TaskContext) = firstParent[PageRow].iterator(split, context) } However when I try to persist and reuse it in 2 transformations. My logs and debug shows that it is being computed twice, rather than being reused in memory. The problem is: there is no such problem for FilteredRDD. How do I avoid this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-extend-an-one-to-one-RDD-of-Spark-that-can-be-persisted-tp20394.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to make symbol for one column in Spark SQL.
You need to import sqlContext._ On Thu, Dec 4, 2014 at 2:26 PM, Tim Chou timchou@gmail.com wrote: I have tried to use function where and filter in SchemaRDD. I have build class for tuple/record in the table like this: case class Region(num:Int, str1:String, str2:String) I also successfully create a SchemaRDD. scala val results = sqlContext.sql(select * from region) results: org.apache.spark.sql.SchemaRDD = SchemaRDD[99] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == ExistingRdd [num#0,str1#1,str2#2], MapPartitionsRDD[4] at mapPartitions at BasicOperators.scala:208 But I cannot use symbol in where and filter function. Here is the log: scala results.where('num === 1) console:22: error: value === is not a member of Symbol results.where('num === 1) ^ I don't know why. Any suggestions? Thanks, Tim
Re: How to make symbol for one column in Spark SQL.
... Thank you! I'm so stupid... This is the only thing I miss in the tutorial...orz Thanks, Tim 2014-12-04 16:49 GMT-06:00 Michael Armbrust mich...@databricks.com: You need to import sqlContext._ On Thu, Dec 4, 2014 at 2:26 PM, Tim Chou timchou@gmail.com wrote: I have tried to use function where and filter in SchemaRDD. I have build class for tuple/record in the table like this: case class Region(num:Int, str1:String, str2:String) I also successfully create a SchemaRDD. scala val results = sqlContext.sql(select * from region) results: org.apache.spark.sql.SchemaRDD = SchemaRDD[99] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == ExistingRdd [num#0,str1#1,str2#2], MapPartitionsRDD[4] at mapPartitions at BasicOperators.scala:208 But I cannot use symbol in where and filter function. Here is the log: scala results.where('num === 1) console:22: error: value === is not a member of Symbol results.where('num === 1) ^ I don't know why. Any suggestions? Thanks, Tim
Loading a large Hbase table into SPARK RDD takes quite long time
I am trying to load a large Hbase table into SPARK RDD to run a SparkSQL query on the entity. For an entity with about 6 million rows, it will take about 35 seconds to load it to RDD. Is it expected? Is there any way to shorten the loading process? I have been getting some tips from http://hbase.apache.org/book/perf.reading.html to speed up the process, e.g., scan.setCaching(cacheSize) and only add the necessary attributes/column to scan. I am just wondering if there are other ways to improve the speed? Here is the code snippet: SparkConf sparkConf = new SparkConf().setMaster(spark://url).setAppName(SparkSQLTest); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration hbase_conf = HBaseConfiguration.create(); hbase_conf.set(hbase.zookeeper.quorum,url); hbase_conf.set(hbase.regionserver.port, 60020); hbase_conf.set(hbase.master, url); hbase_conf.set(TableInputFormat.INPUT_TABLE, entityName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col1)); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col2)); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col3)); scan.setCaching(this.cacheSize); hbase_conf.set(TableInputFormat.SCAN, convertScanToString(scan)); JavaPairRDDImmutableBytesWritable, Result hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); logger.info(count is + hBaseRDD.cache().count()); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-a-large-Hbase-table-into-SPARK-RDD-takes-quite-long-time-tp20396.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL query in scala API
Disclaimer : I am new at Spark I did something similar in a prototype which works but I that did not test at scale yet val agg =3D users.mapValues(_ =3D 1)..aggregateByKey(new CustomAggregation())(CustomAggregation.sequenceOp, CustomAggregation.comboO= p) class CustomAggregation() extends Serializable { var count =3D0: Long val users =3D Set(): Set[String] } object CustomAggregation { def sequenceOp(agg: CustomAggregation, user_id : String ): CustomAggregation =3D { agg.count+=3D1; agg.users+=3Duser_id return agg; } def comboOp(agg: CustomAggregation, agg2: CustomAggregation): CustomAggregation =3D { agg.count+=3D agg2.count agg.users++=3Dagg2.users return agg; } } That should gives you the aggregation , distinct count is the size of users set . I hope this helps Stephane On Wed, Dec 3, 2014 at 5:47 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
Re: Spark 1.1.0 Can not read snappy compressed sequence file
Yes , It is working with this in spark-env.sh export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.0.4.1.jar I tried so many things that I do not even know where I got this from :-( Stephane On Wed, Nov 26, 2014 at 8:08 AM, cjdc cristovao.corde...@cern.ch wrote: Hi, did you get a solution for this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Can-not-read-snappy-compressed-sequence-file-tp18394p19876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
printing mllib.linalg.vector
Basic question: What is the best way to loop through one of these and print their components? Convert them to an array? Thanks Deb
Re: Loading a large Hbase table into SPARK RDD takes quite long time
Hi, What is your cluster setup? How mich memory do you have? How much space does one row only consisting of the 3 columns consume? Do you run other stuff in the background? Best regards Am 04.12.2014 23:57 schrieb bonnahu bonn...@gmail.com: I am trying to load a large Hbase table into SPARK RDD to run a SparkSQL query on the entity. For an entity with about 6 million rows, it will take about 35 seconds to load it to RDD. Is it expected? Is there any way to shorten the loading process? I have been getting some tips from http://hbase.apache.org/book/perf.reading.html to speed up the process, e.g., scan.setCaching(cacheSize) and only add the necessary attributes/column to scan. I am just wondering if there are other ways to improve the speed? Here is the code snippet: SparkConf sparkConf = new SparkConf().setMaster(spark://url).setAppName(SparkSQLTest); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration hbase_conf = HBaseConfiguration.create(); hbase_conf.set(hbase.zookeeper.quorum,url); hbase_conf.set(hbase.regionserver.port, 60020); hbase_conf.set(hbase.master, url); hbase_conf.set(TableInputFormat.INPUT_TABLE, entityName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col1)); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col2)); scan.addColumn(Bytes.toBytes(MetaInfo), Bytes.toBytes(col3)); scan.setCaching(this.cacheSize); hbase_conf.set(TableInputFormat.SCAN, convertScanToString(scan)); JavaPairRDDImmutableBytesWritable, Result hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); logger.info(count is + hBaseRDD.cache().count()); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-a-large-Hbase-table-into-SPARK-RDD-takes-quite-long-time-tp20396.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting all the results from MySQL table
Hi, I am a new spark and scala user. Was trying to use JdbcRDD to query a MySQL table. It needs a lowerbound and upperbound as parameters, but I want to get all the records from the table in a single query. Is there a way I can do that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-all-the-results-from-MySQL-table-tp20403.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
representing RDF literals as vertex properties
@ankurdave's concise code at https://gist.github.com/ankurdave/587eac4d08655d0eebf9, responding to an earlier thread (http://apache-spark-user-list.1001560.n3.nabble.com/How-to-construct-graph-in-graphx-tt16335.html#a16355) shows how to build a graph with multiple edge-types (predicates in RDF-speak). I'm also looking at how to represent literals as vertex properties. It seems one way to do this is via positional convention in an Array/Tuple/List that is the VD; i.e., to represent height, weight, and eyeColor, the VD could be a Tuple3(Double, Double, String). If any of the properties can be present or not, then it seems the code needs to be precise about which elements of the Array/Tuple/List are present and which are not. E.g., to assign only weight, it could be Tuple3(Option(Double), 123.4, Option(String)). Given that vertices can have many many properties, it seems memory consumption for the properties should be as parsimonious as possible. Will any of Array/Tuple/List support sparse usage? Is Option the way to get there? Is this a reasonable approach for representing vertex properties, or is there a better way? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/representing-RDF-literals-as-vertex-properties-tp20404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stateful mapPartitions
I want to have a database connection per partition of the RDD, and then reuse that connection whenever mapPartitions is called, which results in compute being called on the partition. On Thu, Dec 4, 2014 at 11:07 AM, Paolo Platter paolo.plat...@agilelab.it wrote: Could you provide some further details ? What do you nerd to do with db cpnnection? Paolo Inviata dal mio Windows Phone -- Da: Akshat Aranya aara...@gmail.com Inviato: 04/12/2014 18:57 A: user@spark.apache.org Oggetto: Stateful mapPartitions Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open?
Re: Market Basket Analysis
Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias
Re: Stateful mapPartitions
Hi, On Fri, Dec 5, 2014 at 3:56 AM, Akshat Aranya aara...@gmail.com wrote: Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open? If you're using Scala, you can use a singleton object, this will exist once per JVM (i.e., once per executor), like object DatabaseConnector { lazy val conn = ... } Please be aware that shutting down the connection is much harder than opening it, because you basically have no idea when processing is done for an executor, AFAIK. Tobias
Re: representing RDF literals as vertex properties
At 2014-12-04 16:26:50 -0800, spr s...@yarcdata.com wrote: I'm also looking at how to represent literals as vertex properties. It seems one way to do this is via positional convention in an Array/Tuple/List that is the VD; i.e., to represent height, weight, and eyeColor, the VD could be a Tuple3(Double, Double, String). [...] Given that vertices can have many many properties, it seems memory consumption for the properties should be as parsimonious as possible. Will any of Array/Tuple/List support sparse usage? Is Option the way to get there? Storing vertex properties positionally with Array[Option[Any]] or any of the other sequence types will provide a dense representation. For a sparse representation, the right data type is a Map[String, Any], which will let you access properties by name and will only store the nonempty properties. Since the value type in the map has to be Any, or more precisely the least upper bound of the property types, this sacrifices type safety and you'll have to downcast when retrieving properties. If there are particular subsets of the properties that frequently go together, you could instead use a class hierarchy. For example, if the vertices are either people or products, you could use the following: sealed trait VertexProperty extends Serializable case class Person(name: String, weight: Int) extends VertexProperty case class Product(name: String, price: Int) extends VertexProperty Then you could pattern match against the hierarchy instead of downcasting: List(Person(Bob, 180), Product(chair, 800), Product(desk, 200)).flatMap { case Person(name, weight) = Array.empty[Int] case Product(name, price) = Array(price) }.sum Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Market Basket Analysis
Sure, I'm looking to perform frequent item set analysis on POS data set. Apriori is a classic algorithm used for such tasks. Since Apriori implementation is not part of MLLib yet, (see https://issues.apache.org/jira/browse/SPARK-4001) What are some other options/algorithms I could use to perform a similar task? If there's no spoon to spoon substitute, spoon to fork will suffice too. Hopefully this provides some clarification. Thanks, Rohit From: Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp Date: Thursday, December 4, 2014 at 7:20 PM To: Rohit Pujari rpuj...@hortonworks.commailto:rpuj...@hortonworks.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Market Basket Analysis Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.commailto:rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
spark assembly jar caused changed on src filesystem error
Hi all when I execute: /spark-1.1.1-bin-hadoop2.4/bin/spark-submit --verbose --master yarn-cluster --class spark.SimpleApp --jars /spark-1.1.1-bin-hadoop2.4/lib/spark-assembly-1.1.1-hadoop2.4.0.jar --executor-memory 1G --num-executors 2 /spark-1.1.1-bin-hadoop2.4/testfile/simple-project-1.0.jar It comes to this error: appDiagnostics: Application application_1417686359838_0011 failed 2 times due to AM Container for appattempt_1417686359838_0011_02 exited with exitCode: -1000 due to: Resource hdfs://192.168.70.23:9000/user/root/.sparkStaging/application_1417686359838_0011/spark-assembly-1.1.1-hadoop2.4.0.jar changed on src filesystem (expected 1417745205278, was 1417745206168 when I remove the --jars option, namely execute: /spark-1.1.1-bin-hadoop2.4/bin/spark-submit --verbose --master yarn-cluster --class spark.SimpleApp --executor-memory 1G --num-executors 2 /spark-1.1.1-bin-hadoop2.4/testfile/simple-project-1.0.jar Then it comes to the error below: appDiagnostics: Application application_1417686359838_0012 failed 2 times due to AM Container for appattempt_1417686359838_0012_02 exited with exitCode: -1000 due to: File does not exist: hdfs://192.168.70.23:9000/user/root/.sparkStaging/application_1417686359838_0012/spark-assembly-1.1.1-hadoop2.4.0.jar .Failing this attempt.. Failing the application. So my question is : 1. Will spark automatically upload the assembly jar to HDFS? Is it needed to upload it by using --jars option manually? 2. If needed to upload with --jars option, what is the reason of the changed on src filesystem error? How to solve it? And I have try to delete all the jar files in .sparkStaging folder, but the error still exists. I'll appreciate it very much if anyone help me with this. Thanks in advance. Thanks Best Regard LEO HU CDSP SAP LABS CHINA
Exception adding resource files in latest Spark
I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Loading a large Hbase table into SPARK RDD takes quite long time
Hi, Here is the configuration of the cluster: Workers: 2 For each worker, Cores: 24 Total, 0 Used Memory: 69.6 GB Total, 0.0 B Used For the spark.executor.memory, I didn't set it, so it should be the default value 512M. How much space does one row only consisting of the 3 columns consume? the size of 3 columns are very small, probably less than 100 bytes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-a-large-Hbase-table-into-SPARK-RDD-takes-quite-long-time-tp20396p20414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK LIMITATION - more than one case class is not allowed !!
Is it a limitation that spark does not support more than one case class at a time. Regards, Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334p20415.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception adding resource files in latest Spark
Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Loading a large Hbase table into SPARK RDD takes quite long time
Hi Ted, Here is the information about the Regions: Region Server Region Count http://regionserver1:60030/ 44 http://regionserver2:60030/ 39 http://regionserver3:60030/ 55 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-a-large-Hbase-table-into-SPARK-RDD-takes-quite-long-time-tp20396p20417.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception adding resource files in latest Spark
Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK LIMITATION - more than one case class is not allowed !!
On Fri, Dec 5, 2014 at 12:53 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: Is it a limitation that spark does not support more than one case class at a time. What do you mean? I do not have the slightest idea what you *could* possibly mean by to support a case class. Tobias
Window function by Spark SQL
Hi, ALL How can I group by one column and order by another one, then select the first row for each group (which is just like window function doing) by SparkSQL? Best Regards, Kevin.
SparkContext.textfile() cannot load file using UNC path on windows
SparkContext.textfile() cannot load file using UNC path on windows I run the following on Windows XP val conf = new SparkConf().setAppName(testproj1.ClassificationEngine).setMaster(local) val sc = new SparkContext(conf) sc.textFile(raw\\10.209.128.150\TempShare\SvmPocData\reuters-two-categories.load).count() // This line throw the following exception Exception in thread main org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:// 10.209.128.150/TempShare/SvmPocData/reuters-two-categories.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135) at org.apache.spark.rdd.RDD.count(RDD.scala:904) at testproj1.ClassificationEngine$.buildIndex(ClassificationEngine.scala:49) at testproj1.ClassificationEngine$.main(ClassificationEngine.scala:36) at testproj1.ClassificationEngine.main(ClassificationEngine.scala) If I use local path, it works sc.textFile(rawC:/temp/Share/SvmPocData/reuters-two-categories.load).count() sc.textFile(rawC:\temp\Share\SvmPocData\reuters-two-categories.load).count() I tried other form of UNC path below and always got the same exception sc.textFile(raw// 10.209.128.150/TempShare/SvmPocData/reuters-two-categories.load).count() sc.textFile(rawfile:// 10.209.128.150/TempShare/SvmPocData/reuters-two-categories.load).count() sc.textFile(rawfile:/// 10.209.128.150/TempShare/SvmPocData/reuters-two-categories.load).count() sc.textFile(rawfile: 10.209.128.150/TempShare/SvmPocData/reuters-two-categories.load).count() The UNC path is valid. I can go to my windows explorer and type “\\10.209.128.150\TempShare\SvmPocData\reuters-two-categories.load to open the file in notepade. Please advise.
Re: SPARK LIMITATION - more than one case class is not allowed !!
Hi Tobias, Thanks Tobias for your response. I have created objectfiles [person_obj,office_obj] from csv[person_csv,office_csv] files using case classes[person,office] with API (saveAsObjectFile) Now I restarted spark-shell and load objectfiles using API(objectFile). *Once any of one object-class is loaded successfully, rest of object-class gives serialization error.* So my understanding is that more than one case class is not allowed. Hope, I am able to clarify myself. Regards, Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334p20421.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Window function by Spark SQL
Window functions are not supported yet, but there is a PR for it: https://github.com/apache/spark/pull/2953 On 12/5/14 12:22 PM, Dai, Kevin wrote: Hi, ALL How can I group by one column and order by another one, then select the first row for each group (which is just like window function doing) by SparkSQL? Best Regards, Kevin.
Re: SchemaRDD partition on specific column values?
With some quick googling, I learnt that I can we can provide distribute by coulmn_name in hive ql to distribute data based on a column values. My question now if I use distribute by id, will there be any performance improvements? Will I be able to avoid data movement in shuffle(Excahnge before JOIN step) and improve overall performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception adding resource files in latest Spark
Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 1:29 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: I have created objectfiles [person_obj,office_obj] from csv[person_csv,office_csv] files using case classes[person,office] with API (saveAsObjectFile) Now I restarted spark-shell and load objectfiles using API(objectFile). *Once any of one object-class is loaded successfully, rest of object-class gives serialization error.* I have not used saveAsObjectFile, but I think that if you define your case classes in the spark-shell and serialized the objects, and then you restart the spark-shell, the *classes* (structure, names etc.) will not be known to the JVM any more. So if you try to restore the *objects* from a file, the JVM may fail in restoring them, because there is no class it could create objects of. Just a guess. Try to write a Scala program, compile it and see if it still fails when executed. Tobias
Re: Exception adding resource files in latest Spark
Correction: According to Liancheng, this hotfix might be the root cause: https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce Jianshi On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK LIMITATION - more than one case class is not allowed !!
Tobias, Thanks for quick reply. Definitely, after restart case classes need to be defined again. I have done so thats why spark is able to load objectfile [e.g. person_obj] and spark has maintained serialVersionUID [person_obj]. Next time when I am trying to load another objectfile [e.g. office_obj] and I think spark is matching serialVersionUID [person_obj] with previous serialVersionUID [person_obj] and giving mismatch error. In my first post, I have give statements which can be executed easily to replicate this issue. Thanks ~Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334p20428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: I have done so thats why spark is able to load objectfile [e.g. person_obj] and spark has maintained serialVersionUID [person_obj]. Next time when I am trying to load another objectfile [e.g. office_obj] and I think spark is matching serialVersionUID [person_obj] with previous serialVersionUID [person_obj] and giving mismatch error. In my first post, I have give statements which can be executed easily to replicate this issue. Can you post the Scala source for your case classes? I have tried the following in spark-shell: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.parallelize(Dog(foo) :: Dog(bar) :: Nil) val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil) dogs.saveAsObjectFile(test_dogs) cats.saveAsObjectFile(test_cats) This gives two directories test_dogs/ and test_cats/. Then I restarted spark-shell and entered: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.objectFile(test_dogs) val cats = sc.objectFile(test_cats) I don't get an exception, but: dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile at console:12 Trying to access the elements of the RDD gave: scala dogs.collect() 14/12/05 15:08:58 INFO FileInputFormat: Total input paths to process : 8 ... org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:980) ... at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1129) ... org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:976) ... 10 more So even in the simplest of cases, this doesn't work for me in the spark-shell, but with a different error. I guess we need to see more of your code to help. Tobias
Re: Exception adding resource files in latest Spark
Thanks for flagging this. I reverted the relevant YARN fix in Spark 1.2 release. We can try to debug this in master. On Thu, Dec 4, 2014 at 9:51 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I created a ticket for this: https://issues.apache.org/jira/browse/SPARK-4757 Jianshi On Fri, Dec 5, 2014 at 1:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Correction: According to Liancheng, this hotfix might be the root cause: https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce Jianshi On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incrementally compile spark examples using mvn
Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi,I have a similar problem.I modified the code in mllib and examples.I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed.How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination();? which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841tel:%28%2B91%29%20-%209836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
RDD.aggregate?
can someone please explain how RDD.aggregate works? i looked at the average example done with aggregate() but i'm still confused about this function... much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-aggregate-tp20434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
Batch is the batch duration that you are specifying while creating the StreamingContext, so at the end of every batch's computation the data will get flushed to Cassandra, and why are you stopping your program with Ctrl + C? You can always specify the time with the sc.awaitTermination(Duration) Thanks Best Regards On Fri, Dec 5, 2014 at 11:53 AM, m.sar...@accenture.com wrote: Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination(); which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- *From:* Gerard Maas gerard.m...@gmail.com *Sent:* Thursday, December 4, 2014 10:22 PM *To:* Akhil Das *Cc:* Sarosh, M.; user@spark.apache.org *Subject:* Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, *Md. Aiman Sarosh.* Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: SPARK LIMITATION - more than one case class is not allowed !!
Tobias, Find csv and scala files and below are steps: 1. Copy csv files in current directory. 2. Open spark-shell from this directory. 3. Run one_scala file which will create object-files from csv-files in current directory. 4. Restart spark-shell 5. a. Run two_scala file, while running it is giving error during loading of office_csv b. If we edit two_scala file by below contents --- case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) sc.objectFile[office](office_obj).count sc.objectFile[person](person_obj).count while running it is giving error during loading of person_csv Regards, Rahul sample.gz http://apache-spark-user-list.1001560.n3.nabble.com/file/n20435/sample.gz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334p20435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark executor lost
It says connection refused, just make sure the network is configured properly (open the ports between master and the worker nodes). If the ports are configured correctly, then i assume the process is getting killed for some reason and hence connection refused. Thanks Best Regards On Fri, Dec 5, 2014 at 12:30 AM, S. Zhou myx...@yahoo.com.invalid wrote: Here is a sample exception I collected from a spark worker node: (there are many such errors across over work nodes). It looks to me that spark worker failed to communicate to executor locally. 14/12/04 04:26:37 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@spark-prod1.xxx:7079] - [akka.tcp://sparkexecu...@spark-prod1.xxx:47710]: Error [Association failed with [akka.tcp://sparkexecu...@spark-prod1.xxx:47710]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@spark-prod1.xxx:47710] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: spark-prod1.XXX/10.51.XX.XX:47710 On Wednesday, December 3, 2014 5:05 PM, Ted Yu yuzhih...@gmail.com wrote: bq. to get the logs from the data nodes Minor correction: the logs are collected from machines where node managers run. Cheers On Wed, Dec 3, 2014 at 3:39 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You want to look further up the stack (there are almost certainly other errors before this happens) and those other errors may give your better idea of what is going on. Also if you are running on yarn you can run yarn logs -applicationId yourAppId to get the logs from the data nodes. Sent with Good (www.good.com) -Original Message- *From: *S. Zhou [myx...@yahoo.com.INVALID] *Sent: *Wednesday, December 03, 2014 06:30 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Spark executor lost We are using Spark job server to submit spark jobs (our spark version is 0.91). After running the spark job server for a while, we often see the following errors (executor lost) in the spark job server log. As a consequence, the spark driver (allocated inside spark job server) gradually loses executors. And finally the spark job server no longer be able to submit jobs. We tried to google the solutions but so far no luck. Please help if you have any ideas. Thanks! [2014-11-25 01:37:36,250] INFO parkDeploySchedulerBackend [] [akka://JobServer/user/context-supervisor/next-staging] - Executor 6 disconnected, so removing it [2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on : remote Akka client disassociated [2014-11-25 01:37:36,252] INFO ark.scheduler.DAGScheduler [] [] - *Executor lost*: 6 (epoch 8) [2014-11-25 01:37:36,252] INFO ge.BlockManagerMasterActor [] [] - Trying to remove executor 6 from BlockManagerMaster. [2014-11-25 01:37:36,252] INFO storage.BlockManagerMaster [] [] - Removed 6 successfully in removeExecutor [2014-11-25 01:37:36,286] INFO ient.AppClient$ClientActor [] [akka://JobServer/user/context-supervisor/next-staging] - Executor updated: app-20141125002023-0037/6 is now FAILED (Command exited with code 143) -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: how do you turn off info logging when running in local mode
Yes, there is away. Just add the following piece of code before creating the SparkContext. import org.apache.log4j.Logger import org.apache.log4j.Level Logger.getLogger(org).setLevel(Level.OFF) Logger.getLogger(akka).setLevel(Level.OFF) Thanks Best Regards On Fri, Dec 5, 2014 at 12:48 AM, Ron Ayoub ronalday...@live.com wrote: I have not yet gotten to the point of running standalone. In my case I'm still working on the initial product and I'm running directly in Eclipse and I've compiled using the Spark maven project since the downloadable spark binaries require Hadoop. With that said, I'm running fine and I have things working but the info logging is so verbose in Eclipse that my own console logging ultimately gets drowned out. Is there a programmatic way to tell Spark to stop logging so much to the console? Thanks.
Re: spark-ec2 Web UI Problem
Its working http://ec2-54-148-248-162.us-west-2.compute.amazonaws.com:8080/ If it ddn't install it correctly, then you could try spark-ec2 script with *--resume* i think Thanks Best Regards On Fri, Dec 5, 2014 at 3:11 AM, Xingwei Yang happy...@gmail.com wrote: Hi Guys: I have succsefully installed apache-spark on Amazon ec2 using spark-ec2 command and I could login to the master node. Here is the installation message: RSYNC'ing /etc/ganglia to slaves... ec2-54-148-197-89.us-west-2.compute.amazonaws.com Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Connection to ec2-54-148-197-89.us-west-2.compute.amazonaws.com closed. Shutting down GANGLIA gmetad: [FAILED] Starting GANGLIA gmetad: [ OK ] Stopping httpd:[FAILED] Starting httpd:[ OK ] Connection to ec2-54-148-248-162.us-west-2.compute.amazonaws.com closed. Spark standalone cluster started at http://ec2-54-148-248-162.us-west-2.compute.amazonaws.com:8080 Ganglia started at http://ec2-54-148-248-162.us-west-2.compute.amazonaws.com:5080/ganglia However, I could not open the web UI. I have checked the security group, which allows the connection to the port 8080. I could not figure out how to solve it. Any sueggestion is appreciated. Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
Clarifications on Spark
Hello, I work for an eCommerce company. Currently we are looking at building a Data warehouse platform as described below: DW as a Service | REST API | SQL On No SQL (Drill/Pig/Hive/Spark SQL) | No SQL databases (One or more. May be RDBMS directly too) | (Bulk load) My SQL Database I wish to get a few clarifications on Apache Drill as follows: 1) Can we use Spark for SQL on No SQL or do we need to mix them with Pig/Hive or any other for any reason? 2) Can Spark SQL be used a query interface for Business Intelligence, Analytics and Reporting 3) Is Spark supports only Hadoop, HBase?. We may use Cassandra/MongoDb/CouchBase as well. 4) Is Spark supports RDBMS too?. We can have a single interface to pull out data from multiple data sources? 5) Any recommendations(not limited to usage of Spark) for our specific requirement described above. Thanks Ajay Note : I have posted a similar post on the Drill User list as well as I am not sure which one best fits for our usecase. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Clarifications-on-Spark-tp20440.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Auto BroadcastJoin optimization failed in latest Spark
Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
If I run ANALYZE without NOSCAN, then Hive can successfully get the size: parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} Is Hive's PARQUET support broken? Jianshi On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 3:51 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: 1. Copy csv files in current directory. 2. Open spark-shell from this directory. 3. Run one_scala file which will create object-files from csv-files in current directory. 4. Restart spark-shell 5. a. Run two_scala file, while running it is giving error during loading of office_csv b. If we edit two_scala file by below contents --- case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) sc.objectFile[office](office_obj).count sc.objectFile[person](person_obj).count while running it is giving error during loading of person_csv One good news is: I can reproduce the error you see. Another good news is: I can tell you how to fix this. In your one.scala file, define all case classes *before* you use saveAsObjectFile() for the first time. With case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) val baseperson = sc.textFile(person_csv)saveAsObjectFile(person_obj) val baseoffice = sc.textFile(office_csv)saveAsObjectFile(office_obj) I can deserialize the obj files (in any order). The bad news is: I have no idea about the reason for this. I blame it on the REPL/shell and assume it would not happen for a compiled application. Tobias
Re: Auto BroadcastJoin optimization failed in latest Spark
With Liancheng's suggestion, I've tried setting spark.sql.hive.convertMetastoreParquet false but still analyze noscan return -1 in rawDataSize Jianshi On Fri, Dec 5, 2014 at 3:33 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: If I run ANALYZE without NOSCAN, then Hive can successfully get the size: parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} Is Hive's PARQUET support broken? Jianshi On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/