Re: How this unit test passed on master trunk?
There are multiple records for the DF scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).show +---+-+ | a|min(struct(unresolvedstar()))| +---+-+ | 1|[1,1]| | 3|[3,1]| | 2|[2,1]| The meaning of .groupBy($"a").agg(min(struct($"record.*"))) is to get the min for all the records with the same $”a” For example: TestData2(1,1) :: TestData2(1,2) The result would be 1, (1, 1), since struct(1, 1) is less than struct(1, 2). Please check how the Ordering is implemented in InterpretedOrdering. The output itself does not have any ordering. I am not sure why the unit test and the real env have different environment. Xiao, I do see the difference between unit test and local cluster run. Do you know the reason? Thanks. Zhan Zhang On Apr 22, 2016, at 11:23 AM, Yong Zhang mailto:java8...@hotmail.com>> wrote: Hi, I was trying to find out why this unit test can pass in Spark code. in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala for this unit test: test("Star Expansion - CreateStruct and CreateArray") { val structDf = testData2.select("a", "b").as("record") // CreateStruct and CreateArray in aggregateExpressions assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == Row(3, Row(3, 1))) assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == Row(3, Seq(3, 1))) // CreateStruct and CreateArray in project list (unresolved alias) assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1))) assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === Seq(1, 1)) // CreateStruct and CreateArray in project list (alias) assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 1))) assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) === Seq(1, 1)) } >From my understanding, the data return in this case should be Row(1, Row(1, >1]), as that will be min of struct. In fact, if I run the spark-shell on my laptop, and I got the result I expected: ./bin/spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala> case class TestData2(a: Int, b: Int) defined class TestData2 scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) :: TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: TestData2(3,2) :: Nil, 2).toDF() scala> val structDF = testData2DF.select("a","b").as("record") scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first() res0: org.apache.spark.sql.Row = [1,[1,1]] scala> structDF.show +---+---+ | a| b| +---+---+ | 1| 1| | 1| 2| | 2| 1| | 2| 2| | 3| 1| | 3| 2| +---+---+ So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back in this case. Why the unit test asserts that Row[3,[1,1]] should be the first, and it will pass? But I cannot reproduce that in my spark-shell? I am trying to understand how to interpret the meaning of "agg(min(struct($"record.*")))" Thanks Yong
Re: Save DataFrame to HBase
You can try this https://github.com/hortonworks/shc.git or here http://spark-packages.org/package/zhzhan/shc Currently it is in the process of merging into HBase. Thanks. Zhan Zhang On Apr 21, 2016, at 8:44 AM, Benjamin Kim mailto:bbuil...@gmail.com>> wrote: Hi Ted, Can this module be used with an older version of HBase, such as 1.0 or 1.1? Where can I get the module from? Thanks, Ben On Apr 21, 2016, at 6:56 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do this. On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim mailto:bbuil...@gmail.com>> wrote: Has anyone found an easy way to save a DataFrame into HBase? Thanks, Ben - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Spark SQL insert overwrite table not showing all the partition.
INSERT OVERWRITE will overwrite any existing data in the table or partition * unless IF NOT EXISTS is provided for a partition (as of Hive 0.9.0<https://issues.apache.org/jira/browse/HIVE-2612>). Thanks. Zhan Zhang On Apr 21, 2016, at 3:20 PM, Bijay Kumar Pathak mailto:bkpat...@mtu.edu>> wrote: Hi, I have a job which writes to the Hive table with dynamic partition. Inside the job, I am writing into the table two-time but I am only seeing the partition with last write although I can see in the Spark UI it is processing data fro both the partition. Below is the query I am using to write to the table. hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2) SELECT * from temp_table """.format(date_val) ) Thanks, Bijay
Re: Spark DataFrame sum of multiple columns
You can define your own udf, following is one example Thanks Zhan Zhang val foo = udf((a: Int, b: String) => a.toString + b) checkAnswer( // SELECT *, foo(key, value) FROM testData testData.select($"*", foo('key, 'value)).limit(3), On Apr 21, 2016, at 8:51 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: Hi, Do we have any way to perform Row level operations in spark dataframes. For example, I have a dataframe with columns from A,B,C,…Z.. I want to add one more column New Column with sum of all column values. A B C D . . . Z New Column 1 2 4 3 26 351 Can somebody help me on this? Thanks, Naveen
Re: Why Spark having OutOfMemory Exception?
The data may be not large, but the driver need to do a lot of bookkeeping. In your case, it is possible the driver control plane takes too much memory. I think you can find a java developer to look at the coredump. Otherwise, it is hard to tell exactly which part are using all the memory. Thanks. Zhan Zhang On Apr 20, 2016, at 1:38 AM, 李明伟 mailto:kramer2...@126.com>> wrote: Hi the input data size is less than 10M. The task result size should be less I think. Because I am doing aggregation&reduce on the data At 2016-04-20 16:18:31, "Jeff Zhang" mailto:zjf...@gmail.com>> wrote: Do you mean the input data size as 10M or the task result size ? >>> But my way is to setup a forever loop to handle continued income data. Not >>> sure if it is the right way to use spark Not sure what this mean, do you use spark-streaming, for doing batch job in the forever loop ? On Wed, Apr 20, 2016 at 3:55 PM, 李明伟 mailto:kramer2...@126.com>> wrote: Hi Jeff The total size of my data is less than 10M. I already set the driver memory to 4GB. 在 2016-04-20 13:42:25,"Jeff Zhang" mailto:zjf...@gmail.com>> 写道: Seems it is OOM in driver side when fetching task result. You can try to increase spark.driver.memory and spark.driver.maxResultSize On Tue, Apr 19, 2016 at 4:06 PM, 李明伟 mailto:kramer2...@126.com>> wrote: Hi Zhan Zhang Please see the exception trace below. It is saying some GC overhead limit error I am not a java or scala developer so it is hard for me to understand these infor. Also reading coredump is too difficult to me.. I am not sure if the way I am using spark is correct. I understand that spark can do batch or stream calculation. But my way is to setup a forever loop to handle continued income data. Not sure if it is the right way to use spark 16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread task-result-getter-2 java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328) at scala.collection.immutable.HashMap.updated(HashMap.scala:54) at scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328) at scala.collection.immutable.HashMap.updated(HashMap.scala:54) at scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:
Re: Read Parquet in Java Spark
You can try something like below, if you only have one column. val rdd = parquetFile.javaRDD().map(row => row.getAs[String](0) Thanks. Zhan Zhang On Apr 18, 2016, at 3:44 AM, Ramkumar V mailto:ramkumar.c...@gmail.com>> wrote: HI, Any idea on this ? Thanks, [http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png] [http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31> On Mon, Apr 4, 2016 at 2:47 PM, Akhil Das mailto:ak...@sigmoidanalytics.com>> wrote: I wasn't knowing you have a parquet file containing json data. Thanks Best Regards On Mon, Apr 4, 2016 at 2:44 PM, Ramkumar V mailto:ramkumar.c...@gmail.com>> wrote: Hi Akhil, Thanks for your help. Why do you put separator as "," ? I have a parquet file which contains only json in each line. Thanks, [http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png] [http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31> On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das mailto:ak...@sigmoidanalytics.com>> wrote: Something like this (in scala): val rdd = parquetFile.javaRDD().map(row => row.mkstring(",")) You can create a map operation over your javaRDD to convert the org.apache.spark.sql.Row<https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/Row.html> to String (the Row.mkstring() Operation) Thanks Best Regards On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V mailto:ramkumar.c...@gmail.com>> wrote: Any idea on this ? How to convert parquet file into JavaRDD ? Thanks, [http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png] [http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V mailto:ramkumar.c...@gmail.com>> wrote: Hi, Thanks for the reply. I tried this. It's returning JavaRDD instead of JavaRDD. How to get JavaRDD ? Error : incompatible types: org.apache.spark.api.java.JavaRDD cannot be converted to org.apache.spark.api.java.JavaRDD Thanks, [http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png] [http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY mailto:umesh9...@gmail.com>> wrote: >From Spark Documentation: DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); JavaRDD jRDD= parquetFile.javaRDD() javaRDD() method will convert the DF to RDD On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V mailto:ramkumar.c...@gmail.com>> wrote: Hi, I'm trying to read parquet log files in Java Spark. Parquet log files are stored in hdfs. I want to read and convert that parquet file into JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if it is sparkcontext and rdd ? what is the best way to read it ? Thanks, [http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png] [http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>
Re: Why Spark having OutOfMemory Exception?
What kind of OOM? Driver or executor side? You can use coredump to find what cause the OOM. Thanks. Zhan Zhang On Apr 18, 2016, at 9:44 PM, 李明伟 mailto:kramer2...@126.com>> wrote: Hi Samaga Thanks very much for your reply and sorry for the delay reply. Cassandra or Hive is a good suggestion. However in my situation I am not sure if it will make sense. My requirements is that to get the recent 24 hour data to generate report. The frequency is 5 minute. So if use cassandra or hive, it means spark will have to read 24 hour data every 5 mintues. And among those data, a big part (like 23 hours or more ) will be repeatedly read. The window in spark is for stream computing. I did not use it but I will consider it Thanks again Regards Mingwei At 2016-04-11 19:09:48, "Lohith Samaga M" mailto:lohith.sam...@mphasis.com>> wrote: >Hi Kramer, > Some options: > 1. Store in Cassandra with TTL = 24 hours. When you read the full > table, you get the latest 24 hours data. > 2. Store in Hive as ORC file and use timestamp field to filter out the > old data. > 3. Try windowing in spark or flink (have not used either). > > >Best regards / Mit freundlichen Grüßen / Sincères salutations >M. Lohith Samaga > > >-Original Message- >From: kramer2...@126.com<mailto:kramer2...@126.com> [mailto:kramer2...@126.com] >Sent: Monday, April 11, 2016 16.18 >To: user@spark.apache.org<mailto:user@spark.apache.org> >Subject: Why Spark having OutOfMemory Exception? > >I use spark to do some very simple calculation. The description is like below >(pseudo code): > > >While timestamp == 5 minutes > >df = read_hdf() # Read hdfs to get a dataframe every 5 minutes > >my_dict[timestamp] = df # Put the data frame into a dict > >delete_old_dataframe( my_dict ) # Delete old dataframe (timestamp is one >24 hour before) > >big_df = merge(my_dict) # Merge the recent 24 hours data frame > >To explain.. > >I have new files comes in every 5 minutes. But I need to generate report on >recent 24 hours data. >The concept of 24 hours means I need to delete the oldest data frame every >time I put a new one into it. >So I maintain a dict (my_dict in above code), the dict contains map like >timestamp: dataframe. Everytime I put dataframe into the dict, I will go >through the dict to delete those old data frame whose timestamp is 24 hour ago. >After delete and input. I merge the data frames in the dict to a big one and >run SQL on it to get my report. > >* >I want to know if any thing wrong about this model? Because it is very slow >after started for a while and hit OutOfMemory. I know that my memory is >enough. Also size of file is very small for test purpose. So should not have >memory problem. > >I am wondering if there is lineage issue, but I am not sure. > >* > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-having-OutOfMemory-Exception-tp26743.html >Sent from the Apache Spark User List mailing list archive at >Nabble.com<http://Nabble.com>. > >- >To unsubscribe, e-mail: >user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> >For additional commands, e-mail: >user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> > >Information transmitted by this e-mail is proprietary to Mphasis, its >associated companies and/ or its customers and is intended >for use only by the individual or entity to which it is addressed, and may >contain information that is privileged, confidential or >exempt from disclosure under applicable law. If you are not the intended >recipient or it appears that this mail has been forwarded >to you without proper authority, you are notified that any use or >dissemination of this information in any manner is strictly >prohibited. In such cases, please notify us immediately at >mailmas...@mphasis.com<mailto:mailmas...@mphasis.com> and delete this mail >from your records. >
Re: Problem using limit clause in spark sql
There has to have a central point to collaboratively collecting exactly 1 records, currently the approach is using one single partitions, which is easy to implement. Otherwise, the driver has to count the number of records in each partition and then decide how many records to be materialized in each partition, because some partition may not have enough number of records, sometimes it is even empty. I didn’t see any straightforward walk around for this. Thanks. Zhan Zhang On Dec 23, 2015, at 5:32 PM, 汪洋 mailto:tiandiwo...@icloud.com>> wrote: It is an application running as an http server. So I collect the data as the response. 在 2015年12月24日,上午8:22,Hudong Wang mailto:justupl...@hotmail.com>> 写道: When you call collect() it will bring all the data to the driver. Do you mean to call persist() instead? From: tiandiwo...@icloud.com<mailto:tiandiwo...@icloud.com> Subject: Problem using limit clause in spark sql Date: Wed, 23 Dec 2015 21:26:51 +0800 To: user@spark.apache.org<mailto:user@spark.apache.org> Hi, I am using spark sql in a way like this: sqlContext.sql(“select * from table limit 1”).map(...).collect() The problem is that the limit clause will collect all the 10,000 records into a single partition, resulting the map afterwards running only in one partition and being really slow.I tried to use repartition, but it is kind of a waste to collect all those records into one partition and then shuffle them around and then collect them again. Is there a way to work around this? BTW, there is no order by clause and I do not care which 1 records I get as long as the total number is less or equal then 1.
Re: Unable to create hive table using HiveContext
You are using embedded mode, which will create the db locally (in your case, maybe the db has been created, but you do not have right permission?). To connect to remote metastore, hive-site.xml has to be correctly configured. Thanks. Zhan Zhang On Dec 23, 2015, at 7:24 AM, Soni spark mailto:soni2015.sp...@gmail.com>> wrote: Hi friends, I am trying to create hive table through spark with Java code in Eclipse using below code. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc<http://sc.sc/>()); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); but i am getting error RROR XBM0J: Directory /home/workspace4/Test/metastore_db already exists. I am not sure why metastore creating in workspace. Please help me. Thanks Soniya
Re: DataFrameWriter.format(String) is there a list of options?
Now json, parquet, orc(in hivecontext), text are natively supported. If you use avro or others, you have to include the package, which are not built into spark jar. Thanks. Zhan Zhang On Dec 23, 2015, at 8:57 AM, Christopher Brady mailto:christopher.br...@oracle.com>> wrote: DataFrameWriter.format
Re: Can SqlContext be used inside mapPartitions
SQLContext is in driver side, and I don’t think you can use it in executors. How to provide lookup functionality in executors really depends on how you would use them. Thanks. Zhan Zhang On Dec 22, 2015, at 4:44 PM, SRK wrote: > Hi, > > Can SQL Context be used inside mapPartitions? My requirement is to register > a set of data from hdfs as a temp table and to be able to lookup from inside > MapPartitions based on a key. If it is not supported, is there a different > way of doing this? > > Thanks! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Can-SqlContext-be-used-inside-mapPartitions-tp25771.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit is ignoring "--executor-cores"
BTW: It is not only a Yarn-webui issue. In capacity scheduler, vcore is ignored. If you want Yarn to honor vcore requests, you have to use DominantResourceCalculator as Saisai suggested. Thanks. Zhan Zhang On Dec 21, 2015, at 5:30 PM, Saisai Shao mailto:sai.sai.s...@gmail.com>> wrote: and you'll see the right vcores y
Re: Spark with log4j
Hi Kalpesh, If you are using spark on yarn, it may not work. Because you write log to files other than stdout/stderr, which yarn log aggregation may not work. As I understand, yarn only aggregate log in stdout/stderr, and local cache will be deleted (in configured timeframe). To check it, at application run time, you can log into the container’s box, and check the local cache of the container to find whether the log file exists or not (after app terminate, these local cache files will be deleted as well). Thanks. Zhan Zhang On Dec 18, 2015, at 7:23 AM, Kalpesh Jadhav mailto:kalpesh.jad...@citiustech.com>> wrote: Hi all, I am new to spark, I am trying to use log4j for logging my application. But any how the logs are not getting written at specified file. I have created application using maven, and kept log.properties file at resources folder. Application written in scala . If there is any alternative instead of log4j then also it will work, but I wanted to see logs in file. If any changes need to be done in hortonworks<https://www.google.co.in/search?client=firefox-a&rls=org.mozilla:en-US:official&channel=fflb&q=hortonworks&spell=1&sa=X&ved=0ahUKEwj5k4Gq2-XJAhXUB44KHYU-C6MQvwUIGSgA> for spark configuration, please mentioned that as well. If anyone has done before or on github any source available please respond. Thanks, Kalpesh Jadhav === DISCLAIMER: The information contained in this message (including any attachments) is confidential and may be privileged. If you have received it by mistake please notify the sender by return e-mail and permanently delete this message and any attachments from your system. Any dissemination, use, review, distribution, printing or copying of this message in whole or in part is strictly prohibited. Please note that e-mails are susceptible to change. CitiusTech shall not be liable for the improper or incomplete transmission of the information contained in this communication nor for any delay in its receipt or damage to your system. CitiusTech does not guarantee that the integrity of this communication has been maintained or that this communication is free of viruses, interceptions or interferences.
Re: number limit of map for spark
What I mean is to combine multiple map functions into one. Don’t know how exactly your algorithms works. Did your one iteration result depend on last iteration? If so, how do they depend on? I think either you can optimize your implementation, or Spark is not the right one for your specific application. Thanks. Zhan Zhang On Dec 21, 2015, at 10:43 AM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: What is difference between repartition / collect and collapse ... Is collapse the same costly as collect or repartition ? Thanks in advance ~ On Tuesday, December 22, 2015 2:24 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: In what situation, you have such cases? If there is no shuffle, you can collapse all these functions into one, right? In the meantime, it is not recommended to collect all data to driver. Thanks. Zhan Zhang On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time. Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0; 100 } 101 }); 104 } 105 106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST 107 t = jsc.parallelize(lt, 1).cache(); 108 109 } 110 .. Thanks very much in advance! Zhiliang
Re: number limit of map for spark
In what situation, you have such cases? If there is no shuffle, you can collapse all these functions into one, right? In the meantime, it is not recommended to collect all data to driver. Thanks. Zhan Zhang On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time. Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0; 100 } 101 }); 104 } 105 106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST 107 t = jsc.parallelize(lt, 1).cache(); 108 109 } 110 .. Thanks very much in advance! Zhiliang
Re: Spark big rdd problem
There are two cases here. If the container is killed by yarn, you can increase jvm overhead. Otherwise, you have to increase the executor-memory if there is no memory leak happening. Thanks. Zhan Zhang On Dec 15, 2015, at 9:58 PM, Eran Witkon mailto:eranwit...@gmail.com>> wrote: If the problem is containers trying to use more memory then they allowed, how do I limit them? I all ready have executor-memory 5G Eran On Tue, 15 Dec 2015 at 23:10 Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: You should be able to get the logs from yarn by “yarn logs -applicationId xxx”, where you can possible find the cause. Thanks. Zhan Zhang On Dec 15, 2015, at 11:50 AM, Eran Witkon mailto:eranwit...@gmail.com>> wrote: > When running > val data = sc.wholeTextFile("someDir/*") data.count() > > I get numerous warning from yarn till I get aka association exception. > Can someone explain what happen when spark loads this rdd and can't fit it > all in memory? > Based on the exception it looks like the server is disconnecting from yarn > and failing... Any idea why? The code is simple but still failing... > Eran
Re: Spark big rdd problem
You should be able to get the logs from yarn by “yarn logs -applicationId xxx”, where you can possible find the cause. Thanks. Zhan Zhang On Dec 15, 2015, at 11:50 AM, Eran Witkon wrote: > When running > val data = sc.wholeTextFile("someDir/*") data.count() > > I get numerous warning from yarn till I get aka association exception. > Can someone explain what happen when spark loads this rdd and can't fit it > all in memory? > Based on the exception it looks like the server is disconnecting from yarn > and failing... Any idea why? The code is simple but still failing... > Eran - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: About Spark On Hbase
If you want dataframe support, you can refer to https://github.com/zhzhan/shc, which I am working on to integrate to HBase upstream with existing support. Thanks. Zhan Zhang On Dec 15, 2015, at 4:34 AM, censj mailto:ce...@lotuseed.com>> wrote: hi,fight fate Did I can in bulkPut() function use Get value first ,then put this value to Hbase ? 在 2015年12月9日,16:02,censj mailto:ce...@lotuseed.com>> 写道: Thank you! I know 在 2015年12月9日,15:59,fightf...@163.com<mailto:fightf...@163.com> 写道: If you are using maven , you can add the cloudera maven repo to the repository in pom.xml and add the dependency of spark-hbase. I just found this : http://spark-packages.org/package/nerdammer/spark-hbase-connector as Feng Dongyu recommend, you can try this also, but I had no experience of using this. fightf...@163.com<mailto:fightf...@163.com> 发件人: censj<mailto:ce...@lotuseed.com> 发送时间: 2015-12-09 15:44 收件人: fightf...@163.com<mailto:fightf...@163.com> 抄送: user@spark.apache.org<mailto:user@spark.apache.org> 主题: Re: About Spark On Hbase So, I how to get this jar? I use set package project.I not found sbt lib. 在 2015年12月9日,15:42,fightf...@163.com<mailto:fightf...@163.com> 写道: I don't think it really need CDH component. Just use the API fightf...@163.com<mailto:fightf...@163.com> 发件人: censj<mailto:ce...@lotuseed.com> 发送时间: 2015-12-09 15:31 收件人: fightf...@163.com<mailto:fightf...@163.com> 抄送: user@spark.apache.org<mailto:user@spark.apache.org> 主题: Re: About Spark On Hbase But this is dependent on CDH。I not install CDH。 在 2015年12月9日,15:18,fightf...@163.com<mailto:fightf...@163.com> 写道: Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase Also, HBASE-13992<https://issues.apache.org/jira/browse/HBASE-13992> already integrates that feature into the hbase side, but that feature has not been released. Best, Sun. fightf...@163.com<mailto:fightf...@163.com> From: censj<mailto:ce...@lotuseed.com> Date: 2015-12-09 15:04 To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: About Spark On Hbase hi all, now I using spark,but I not found spark operation hbase open source. Do any one tell me?
Re: Multi-core support per task in Spark
I noticed that it is configurable in job level spark.task.cpus. Anyway to support on task level? Thanks. Zhan Zhang On Dec 11, 2015, at 10:46 AM, Zhan Zhang wrote: > Hi Folks, > > Is it possible to assign multiple core per task and how? Suppose we have some > scenario, in which some tasks are really heavy processing each record and > require multi-threading, and we want to avoid similar tasks assigned to the > same executors/hosts. > > If it is not supported, does it make sense to add this feature. It may seems > make user worry about more configuration, but by default we can still do 1 > core per task and only advanced users need to be aware of this feature. > > Thanks. > > Zhan Zhang > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?
I think you are fetching too many results to the driver. Typically, it is not recommended to collect much data to driver. But if you have to, you can increase the driver memory, when submitting jobs. Thanks. Zhan Zhang On Dec 11, 2015, at 6:14 AM, Tom Seddon mailto:mr.tom.sed...@gmail.com>> wrote: I have a job that is running into intermittent errors with [SparkDriver] java.lang.OutOfMemoryError: Java heap space. Before I was getting this error I was getting errors saying the result size exceed the spark.driver.maxResultSize. This does not make any sense to me, as there are no actions in my job that send data to the driver - just a pull of data from S3, a map and reduceByKey and then conversion to dataframe and saveAsTable action that puts the results back on S3. I've found a few references to reduceByKey and spark.driver.maxResultSize having some importance, but cannot fathom how this setting could be related. Would greatly appreciated any advice. Thanks in advance, Tom
Re: Performance does not increase as the number of workers increasing in cluster mode
Not sure your data and model size. But intuitively, there is a tradeoff between parallel and network overhead. With the same data set and model, there is a optimum point of cluster size (performance may degrade at some point with the cluster size increment). You may want to test larger data set if you wan tot do some performance benchmark. Thanks. Zhan Zhang On Dec 11, 2015, at 9:34 AM, Wei Da mailto:xwd0...@qq.com>> wrote: Hi, all I have done a test in different HW configurations of Spark 1.5.0. A KMeans algorithm has been ran in four different Spark environments, the first one ran in local mode, the other three ran in cluster mode, all the nodes are with the same CPU (6 cores) and Memory (8G). The running times are recorded in the following. I thought the performance should increase as the number of workers increasing. But the result shows no obvious improvement. Does anybody know the reason? Thanks a lot in advance! The number of rows in test data is about 2.6 million, the input file is about 810M and stores in HDFS. [X] Following is snapshot of the Spark WebUI. [X] Wei Da Wei Da xwd0...@qq.com<mailto:xwd0...@qq.com>
Re: how to access local file from Spark sc.textFile("file:///path to/myfile")
As Sean mentioned, you cannot referring to the local file in your remote machine (executors). One walk around is to copy the file to all machines within same directory. Thanks. Zhan Zhang On Dec 11, 2015, at 10:26 AM, Lin, Hao mailto:hao@finra.org>> wrote: of the master node
Multi-core support per task in Spark
Hi Folks, Is it possible to assign multiple core per task and how? Suppose we have some scenario, in which some tasks are really heavy processing each record and require multi-threading, and we want to avoid similar tasks assigned to the same executors/hosts. If it is not supported, does it make sense to add this feature. It may seems make user worry about more configuration, but by default we can still do 1 core per task and only advanced users need to be aware of this feature. Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrames initial jdbc loading - will it be utilizing a filter predicate?
When you have following query, 'account=== “acct1” will be pushdown to generate new query with “where account = acct1” Thanks. Zhan Zhang On Nov 18, 2015, at 11:36 AM, Eran Medan mailto:eran.me...@gmail.com>> wrote: I understand that the following are equivalent df.filter('account === "acct1") sql("select * from tempTableName where account = 'acct1'") But is Spark SQL "smart" to also push filter predicates down for the initial load? e.g. sqlContext.read.jdbc(…).filter('account=== "acct1") Is Spark "smart enough" to this for each partition? ‘select … where account= ‘acc1’ AND (partition where clause here)? Or do I have to put it on each partition where clause otherwise it will load the entire set and only then filter it in memory? [https://mailfoogae.appspot.com/t?sender=aZWhyYW5uLm1laGRhbkBnbWFpbC5jb20%3D&type=zerocontent&guid=4e81181c-98d1-4dac-b047-a4c9e7d864d9]ᐧ
Re: Spark Thrift doesn't start
In the hive-site.xml, you can remove all configuration related to tez and give it a try again. Thanks. Zhan Zhang On Nov 10, 2015, at 10:47 PM, DaeHyun Ryu mailto:ry...@kr.ibm.com>> wrote: Hi folks, I configured tez as execution engine of Hive. After done that, whenever I started spark thrift server, it just stopped automatically. I checked log and saw the following messages. My spark version is 1.4.1 and tez version is 0.7.0 (IBM BigInsights 4.1) Does anyone have any idea on this ? java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:353) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:116) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:163) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:161) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:168) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028) at $iwC$$iwC.(:9) at $iwC.(:18) at (:20) at .(:24) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:130) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:157) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.Spark
Re: Anybody hit this issue in spark shell?
Thanks Ted. I am using latest master branch. I will try your build command and give it a try. Thank. Zhan Zhang On Nov 9, 2015, at 10:46 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: Which branch did you perform the build with ? I used the following command yesterday: mvn -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 -Dhadoop.version=2.7.0 package -DskipTests Spark shell was working. Building with latest master branch. On Mon, Nov 9, 2015 at 10:37 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, Does anybody meet the following issue? I use "mvn package -Phive -DskipTests” to build the package. Thanks. Zhan Zhang bin/spark-shell ... Spark context available as sc. error: error while loading QueryExecution, Missing dependency 'bad symbolic reference. A signature in QueryExecution.class refers to term annotations in package com.google.common which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling QueryExecution.class.', required by /Users/zzhang/repo/spark/assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.2.0.jar(org/apache/spark/sql/execution/QueryExecution.class) :10: error: not found: value sqlContext import sqlContext.implicits._ ^ :10: error: not found: value sqlContext import sqlContext.sql ^
Anybody hit this issue in spark shell?
Hi Folks, Does anybody meet the following issue? I use "mvn package -Phive -DskipTests” to build the package. Thanks. Zhan Zhang bin/spark-shell ... Spark context available as sc. error: error while loading QueryExecution, Missing dependency 'bad symbolic reference. A signature in QueryExecution.class refers to term annotations in package com.google.common which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling QueryExecution.class.', required by /Users/zzhang/repo/spark/assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.2.0.jar(org/apache/spark/sql/execution/QueryExecution.class) :10: error: not found: value sqlContext import sqlContext.implicits._ ^ :10: error: not found: value sqlContext import sqlContext.sql ^
Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell
Hi Jerry, https://issues.apache.org/jira/browse/SPARK-11562 is created for the issue. Thanks. Zhan Zhang On Nov 6, 2015, at 3:01 PM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi Zhan, Thank you for providing a workaround! I will try this out but I agree with Ted, there should be a better way to capture the exception and handle it by just initializing SQLContext instead of HiveContext. WARN the user that something is wrong with his hive setup. Having spark.sql.hive.enabled false configuration would be lovely too. :) Just an additional bonus is that it requires less memory if we don’t use HiveContext on the driver side (~100-200MB) from a rough observation. Thanks and have a nice weekend! Jerry On Nov 6, 2015, at 5:53 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: I would suggest adding a config parameter that allows bypassing initialization of HiveContext in case of SQLException Cheers On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Jerry, OK. Here is an ugly walk around. Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a bunch of exceptions because hive context initialization failure, but you can initialize your SQLContext on your own. scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a5cc2e8 scala> import sqlContext.implicits._ import sqlContext.implicits._ for example HW11188:spark zzhang$ more conf/hive-site.xml hive.metastore.uris thrift://zzhang-yarn11:9083 HW11188:spark zzhang$ By the way, I don’t know whether there is any caveat for this walk around. Thanks. Zhan Zhang On Nov 6, 2015, at 2:40 PM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi Zhan, I don’t use HiveContext features at all. I use mostly DataFrame API. It is sexier and much less typo. :) Also, HiveContext requires metastore database setup (derby by default). The problem is that I cannot have 2 spark-shell sessions running at the same time in the same host (e.g. /home/jerry directory). It will give me an exception like below. Since I don’t use HiveContext, I don’t see the need to maintain a database. What is interesting is that pyspark shell is able to start more than 1 session at the same time. I wonder what pyspark has done better than spark-shell? Best Regards, Jerry On Nov 6, 2015, at 5:28 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: If you assembly jar have hive jar included, the HiveContext will be used. Typically, HiveContext has more functionality than SQLContext. In what case you have to use SQLContext that cannot be done by HiveContext? Thanks. Zhan Zhang On Nov 6, 2015, at 10:43 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: What is interesting is that pyspark shell works fine with multiple session in the same host even though multiple HiveContext has been created. What does pyspark does differently in terms of starting up the shell? On Nov 6, 2015, at 12:12 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: In SQLContext.scala : // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). properties.foreach { case (key, value) => setConf(key, value) } I don't see config of skipping the above call. FYI On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi spark users and developers, Is it possible to disable HiveContext from being instantiated when using spark-shell? I got the following errors when I have more than one session starts. Since I don't use HiveContext, it would be great if I can have more than 1 spark-shell start at the same time. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS toreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183) at org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185) at or
Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell
I agree with minor change. Adding a config to provide the option to init SQLContext or HiveContext, with HiveContext as default instead of bypassing when hitting the Exception. Thanks. Zhan Zhang On Nov 6, 2015, at 2:53 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: I would suggest adding a config parameter that allows bypassing initialization of HiveContext in case of SQLException Cheers On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Jerry, OK. Here is an ugly walk around. Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a bunch of exceptions because hive context initialization failure, but you can initialize your SQLContext on your own. scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a5cc2e8 scala> import sqlContext.implicits._ import sqlContext.implicits._ for example HW11188:spark zzhang$ more conf/hive-site.xml hive.metastore.uris thrift://zzhang-yarn11:9083 HW11188:spark zzhang$ By the way, I don’t know whether there is any caveat for this walk around. Thanks. Zhan Zhang On Nov 6, 2015, at 2:40 PM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi Zhan, I don’t use HiveContext features at all. I use mostly DataFrame API. It is sexier and much less typo. :) Also, HiveContext requires metastore database setup (derby by default). The problem is that I cannot have 2 spark-shell sessions running at the same time in the same host (e.g. /home/jerry directory). It will give me an exception like below. Since I don’t use HiveContext, I don’t see the need to maintain a database. What is interesting is that pyspark shell is able to start more than 1 session at the same time. I wonder what pyspark has done better than spark-shell? Best Regards, Jerry On Nov 6, 2015, at 5:28 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: If you assembly jar have hive jar included, the HiveContext will be used. Typically, HiveContext has more functionality than SQLContext. In what case you have to use SQLContext that cannot be done by HiveContext? Thanks. Zhan Zhang On Nov 6, 2015, at 10:43 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: What is interesting is that pyspark shell works fine with multiple session in the same host even though multiple HiveContext has been created. What does pyspark does differently in terms of starting up the shell? On Nov 6, 2015, at 12:12 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: In SQLContext.scala : // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). properties.foreach { case (key, value) => setConf(key, value) } I don't see config of skipping the above call. FYI On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi spark users and developers, Is it possible to disable HiveContext from being instantiated when using spark-shell? I got the following errors when I have more than one session starts. Since I don't use HiveContext, it would be great if I can have more than 1 spark-shell start at the same time. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS toreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183) at org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell
Hi Jerry, OK. Here is an ugly walk around. Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a bunch of exceptions because hive context initialization failure, but you can initialize your SQLContext on your own. scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a5cc2e8 scala> import sqlContext.implicits._ import sqlContext.implicits._ for example HW11188:spark zzhang$ more conf/hive-site.xml hive.metastore.uris thrift://zzhang-yarn11:9083 HW11188:spark zzhang$ By the way, I don’t know whether there is any caveat for this walk around. Thanks. Zhan Zhang On Nov 6, 2015, at 2:40 PM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi Zhan, I don’t use HiveContext features at all. I use mostly DataFrame API. It is sexier and much less typo. :) Also, HiveContext requires metastore database setup (derby by default). The problem is that I cannot have 2 spark-shell sessions running at the same time in the same host (e.g. /home/jerry directory). It will give me an exception like below. Since I don’t use HiveContext, I don’t see the need to maintain a database. What is interesting is that pyspark shell is able to start more than 1 session at the same time. I wonder what pyspark has done better than spark-shell? Best Regards, Jerry On Nov 6, 2015, at 5:28 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: If you assembly jar have hive jar included, the HiveContext will be used. Typically, HiveContext has more functionality than SQLContext. In what case you have to use SQLContext that cannot be done by HiveContext? Thanks. Zhan Zhang On Nov 6, 2015, at 10:43 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: What is interesting is that pyspark shell works fine with multiple session in the same host even though multiple HiveContext has been created. What does pyspark does differently in terms of starting up the shell? On Nov 6, 2015, at 12:12 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: In SQLContext.scala : // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). properties.foreach { case (key, value) => setConf(key, value) } I don't see config of skipping the above call. FYI On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi spark users and developers, Is it possible to disable HiveContext from being instantiated when using spark-shell? I got the following errors when I have more than one session starts. Since I don't use HiveContext, it would be great if I can have more than 1 spark-shell start at the same time. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS toreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183) at org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.SQLContext.(SQLContext.scala:234) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.refle
Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell
If you assembly jar have hive jar included, the HiveContext will be used. Typically, HiveContext has more functionality than SQLContext. In what case you have to use SQLContext that cannot be done by HiveContext? Thanks. Zhan Zhang On Nov 6, 2015, at 10:43 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: What is interesting is that pyspark shell works fine with multiple session in the same host even though multiple HiveContext has been created. What does pyspark does differently in terms of starting up the shell? On Nov 6, 2015, at 12:12 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: In SQLContext.scala : // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). properties.foreach { case (key, value) => setConf(key, value) } I don't see config of skipping the above call. FYI On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam mailto:chiling...@gmail.com>> wrote: Hi spark users and developers, Is it possible to disable HiveContext from being instantiated when using spark-shell? I got the following errors when I have more than one session starts. Since I don't use HiveContext, it would be great if I can have more than 1 spark-shell start at the same time. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS toreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183) at org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235) at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.SQLContext.(SQLContext.scala:234) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028) at org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154) at org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127) at org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113) at org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113) Best Regards, Jerry
Re: Upgrade spark cluster to latest version
Spark is a client library. You can just download the latest release or build on you own, and replace your existing one without changing you existing cluster. Thanks. Zhan Zhang On Nov 3, 2015, at 3:58 PM, roni mailto:roni.epi...@gmail.com>> wrote: Hi Spark experts, This may be a very naive question but can you pl. point me to a proper way to upgrade spark version on an existing cluster. Thanks Roni Hi, I have a current cluster running spark 1.4 and want to upgrade to latest version. How can I do it without creating a new cluster so that all my other setting getting erased. Thanks _R
Re: Vague Spark SQL error message with saveAsParquetFile
Looks like some JVM got killed or OOM. You can check the log to see the real causes. Thanks. Zhan Zhang On Nov 3, 2015, at 9:23 AM, YaoPau mailto:jonrgr...@gmail.com>> wrote: java.io.FileNotFoun
Re: sql query orc slow
Hi Patcharee, I am not sure which side is wrong, driver or executor. If it is executor side, the reason you mentioned may be possible. But if the driver side didn’t set the predicate at all, then somewhere else is broken. Can you please file a JIRA with a simple reproduce step, and let me know the JIRA number? Thanks. Zhan Zhang On Oct 13, 2015, at 1:01 AM, Patcharee Thongtra mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang, Is my problem (which is ORC predicate is not generated from WHERE clause even though spark.sql.orc.filterPushdown=true) can be related to some factors below ? - orc file version (File Version: 0.12 with HIVE_8732) - hive version (using Hive 1.2.1.2.3.0.0-2557) - orc table is not sorted / indexed - the split strategy hive.exec.orc.split.strategy BR, Patcharee On 10/09/2015 08:01 PM, Zhan Zhang wrote: That is weird. Unfortunately, there is no debug info available on this part. Can you please open a JIRA to add some debug information on the driver side? Thanks. Zhan Zhang On Oct 9, 2015, at 10:22 AM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the log No ORC pushdown predicate for my query with WHERE clause. 15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate I did not understand what wrong with this. BR, Patcharee On 09. okt. 2015 19:10, Zhan Zhang wrote: In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. >Partition pruning and predicate pushdown does not have effect. Do you see big >IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, --
Re: sql query orc slow
That is weird. Unfortunately, there is no debug info available on this part. Can you please open a JIRA to add some debug information on the driver side? Thanks. Zhan Zhang On Oct 9, 2015, at 10:22 AM, patcharee mailto:patcharee.thong...@uni.no>> wrote: I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the log No ORC pushdown predicate for my query with WHERE clause. 15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate I did not understand what wrong with this. BR, Patcharee On 09. okt. 2015 19:10, Zhan Zhang wrote: In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. >Partition pruning and predicate pushdown does not have effect. Do you see big >IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee <<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: <mailto:user-h...@spark.apache.org> user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: sql query orc slow
In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. >Partition pruning and predicate pushdown does not have effect. Do you see big >IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee mailto:patcharee.thong...@uni.no>> wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: sql query orc slow
Hi Patcharee, >From the query, it looks like only the column pruning will be applied. >Partition pruning and predicate pushdown does not have effect. Do you see big >IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee wrote: > Yes, the predicate pushdown is enabled, but still take longer time than the > first method > > BR, > Patcharee > > On 08. okt. 2015 18:43, Zhan Zhang wrote: >> Hi Patcharee, >> >> Did you enable the predicate pushdown in the second method? >> >> Thanks. >> >> Zhan Zhang >> >> On Oct 8, 2015, at 1:43 AM, patcharee wrote: >> >>> Hi, >>> >>> I am using spark sql 1.5 to query a hive table stored as partitioned orc >>> file. We have the total files is about 6000 files and each file size is >>> about 245MB. >>> >>> What is the difference between these two query methods below: >>> >>> 1. Using query on hive table directly >>> >>> hiveContext.sql("select col1, col2 from table1") >>> >>> 2. Reading from orc file, register temp table and query from the temp table >>> >>> val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") >>> c.registerTempTable("regTable") >>> hiveContext.sql("select col1, col2 from regTable") >>> >>> When the number of files is large (query all from the total 6000 files) , >>> the second case is much slower then the first one. Any ideas why? >>> >>> BR, >>> >>> >>> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sql query orc slow
Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee wrote: > Hi, > > I am using spark sql 1.5 to query a hive table stored as partitioned orc > file. We have the total files is about 6000 files and each file size is about > 245MB. > > What is the difference between these two query methods below: > > 1. Using query on hive table directly > > hiveContext.sql("select col1, col2 from table1") > > 2. Reading from orc file, register temp table and query from the temp table > > val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") > c.registerTempTable("regTable") > hiveContext.sql("select col1, col2 from regTable") > > When the number of files is large (query all from the total 6000 files) , the > second case is much slower then the first one. Any ideas why? > > BR, > > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to submit the spark job outside the cluster
Hi Zhiliang, I cannot find a specific doc. But as far as I remember, you can log in one of your cluster machine, and find the hadoop configuration location, for example /etc/hadoop/conf, copy that directory to your local machine. Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, and the latter is used to launch application on top of yarn. Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. Thanks. Zhan Zhang On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com>> wrote: Hi Zhan, Yes, I get it now. I have not ever deployed hadoop configuration locally, and do not find the specific doc, would you help provide the doc to do that... Thank you, Zhiliang On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com>> wrote: Hi Zhan, Thanks very much for your help comment. I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that when it comes to spark. Have you ever tried that for spark... Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for me to do that, I do not find the specific doc for it. Best Regards, Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster, however, I would like to submit the job from another machine which does not belong to the cluster. I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is used to connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~ Zhiliang
Re: how to submit the spark job outside the cluster
There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com>> wrote: Hi Zhan, Thanks very much for your help comment. I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that when it comes to spark. Have you ever tried that for spark... Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for me to do that, I do not find the specific doc for it. Best Regards, Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster, however, I would like to submit the job from another machine which does not belong to the cluster. I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is used to connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~ Zhiliang
Re: how to submit the spark job outside the cluster
It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu mailto:zchl.j...@yahoo.com.INVALID>> wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster, however, I would like to submit the job from another machine which does not belong to the cluster. I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is used to connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~ Zhiliang
Re: HDP 2.3 support for Spark 1.5.x
Hi Krishna, For the time being, you can download from upstream, and it should be running OK for HDP2.3. For hdp specific problem, you can ask in Hortonworks forum. Thanks. Zhan Zhang On Sep 22, 2015, at 3:42 PM, Krishna Sankar mailto:ksanka...@gmail.com>> wrote: Guys, * We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The current wisdom is that it will support the 1.4.x train (which is good, need DataFrame et al). * What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on HDP 2.3 ? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ? Cheers & Thanks
Re: PrunedFilteredScan does not work for UDTs and Struct fields
Hi Richard, I am not sure how to support user-defined type. But regarding your second question, you can have a walkaround as following. Suppose you have a struct a, and want to filter a.c with a.c > X. You can define a alias C as a.c, and add extra column C to the schema of the relation, and your query would be C > X instead of a.c > X. In this way, in the buildScan you would have GreaterThan(C, X). You then can programmatically convert C to a.c. Note that in the buildScan required columns would also have an extra column C you need to returned in the buildScan RDD. It looks complicated, but I think it would work. Thanks. Zhan Zhang From: Richard Eggert Sent: Saturday, September 19, 2015 3:59 PM To: User Subject: PrunedFilteredScan does not work for UDTs and Struct fields I defined my own relation (extending BaseRelation) and implemented the PrunedFilteredScan interface, but discovered that if the column referenced in a WHERE = clause is a user-defined type or a field of a struct column, then Spark SQL passes NO filters to the PrunedFilteredScan.buildScan method, rendering the interface useless. Is there really no way to implement a relation to optimize on such fields? -- Rich
Re: Error when saving a dataframe as ORC file
If you are using spark-1.4.0, probably it is caused by SPARK-8458<https://issues.apache.org/jira/browse/SPARK-8458> Thanks. Zhan Zhang On Aug 23, 2015, at 12:49 PM, lostrain A mailto:donotlikeworkingh...@gmail.com>> wrote: Ted, Thanks for the suggestions. Actually I tried both s3n and s3 and the result remains the same. On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: In your case, I would specify "fs.s3.awsAccessKeyId" / "fs.s3.awsSecretAccessKey" since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A mailto:donotlikeworkingh...@gmail.com>> wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**") However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A mailto:donotlikeworkingh...@gmail.com>> wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format("orc").save("s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at :29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at :29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Authentication Support with spark-submit cluster mode
If you run it on yarn with kerberos setup. You authenticate yourself by kinit before launching the job. Thanks. Zhan Zhang On Jul 28, 2015, at 8:51 PM, Anh Hong mailto:hongnhat...@yahoo.com.INVALID>> wrote: Hi, I'd like to remotely run spark-submit from a local machine to submit a job to spark cluster (cluster mode). What method do I use to authenticate myself to the cluster? Like how to pass user id or password or private key to the cluster Any help is appreciated.
Re: [SPAM] Customized Aggregation Query on Spark SQL
One optimization is to reduce the shuffle by first aggregate locally (only keep the max for each name), and then reduceByKey. Thanks. Zhan Zhang On Apr 24, 2015, at 10:03 PM, ayan guha mailto:guha.a...@gmail.com>> wrote: Here you go t = [["A",10,"A10"],["A",20,"A20"],["A",30,"A30"],["B",15,"B15"],["C",10,"C10"],["C",20,"C200"]] TRDD = sc.parallelize(t).map(lambda t: Row(name=str(t[0]),age=int(t[1]),other=str(t[2]))) TDF = ssc.createDataFrame(TRDD) print TDF.printSchema() TDF.registerTempTable("tab") JN = ssc.sql("select t.name<http://t.name/>,t.age,t.other from tab t inner join (select name,max(age) age from tab group by name) t1 on t.name<http://t.name/>=t1.name<http://t1.name/> and t.age=t1.age") for i in JN.collect(): print i Result: Row(name=u'A', age=30, other=u'A30') Row(name=u'B', age=15, other=u'B15') Row(name=u'C', age=20, other=u'C200') On Sat, Apr 25, 2015 at 2:48 PM, Wenlei Xie mailto:wenlei@gmail.com>> wrote: Sure. A simple example of data would be (there might be many other columns) Name AgeOther A 10A10 A20 A20 A30 A30 B15 B15 C10C10 C20 C20 The desired output would be Name AgeOther A 30 A30 B 15 B15 C 20 C20 Thank you so much for the help! On Sat, Apr 25, 2015 at 12:41 AM, ayan guha mailto:guha.a...@gmail.com>> wrote: can you give an example set of data and desired output> On Sat, Apr 25, 2015 at 2:32 PM, Wenlei Xie mailto:wenlei@gmail.com>> wrote: Hi, I would like to answer the following customized aggregation query on Spark SQL 1. Group the table by the value of Name 2. For each group, choose the tuple with the max value of Age (the ages are distinct for every name) I am wondering what's the best way to do it on Spark SQL? Should I use UDAF? Previously I am doing something like the following on Spark: personRDD.map(t => (t.name<http://t.name/>, t)) .reduceByKey((a, b) => if (a.age > b.age) a else b) Thank you! Best, Wenlei -- Best Regards, Ayan Guha -- Wenlei Xie (谢文磊) Ph.D. Candidate Department of Computer Science 456 Gates Hall, Cornell University Ithaca, NY 14853, USA Email: wenlei@gmail.com<mailto:wenlei@gmail.com> -- Best Regards, Ayan Guha
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Besides the hdp.version in spark-defaults.conf, I think you probably forget to put the file java-opts under $SPARK_HOME/conf with following contents. [root@c6402 conf]# pwd /usr/hdp/current/spark-client/conf [root@c6402 conf]# ls fairscheduler.xml.template java-opts log4j.properties.template metrics.properties.template spark-defaults.conf spark-env.sh hive-site.xml log4j.properties metrics.properties slaves.template spark-defaults.conf.template spark-env.sh.template [root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0-2041 [root@c6402 conf]# Thanks. Zhan Zhang On Apr 17, 2015, at 3:09 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: Hi, This is the log trace: https://gist.github.com/uditmehta27/511eac0b76e6d61f8b47 On the yarn RM UI, I see : Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher The command I run is: bin/spark-shell --master yarn-client The spark defaults I use is: spark.yarn.jar hdfs://namenode1-dev.snc1:8020/spark/spark-assembly-1.3.0-hadoop2.4.0.jar spark.yarn.access.namenodes hdfs://namenode1-dev.snc1:8032 spark.dynamicAllocation.enabled false spark.scheduler.mode FAIR spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 Is there anything wrong in what I am trying to do? thanks again! On Fri, Apr 17, 2015 at 2:56 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog mailto:doug.sparku...@dugos.com>> wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mailto:mst...@mathom.us>> wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the "bad substitution" message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog mailto:doug.sparku...@dugos.com>> wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mailto:mst...@mathom.us>> wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the "bad substitution" message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
You probably want to first try the basic configuration to see whether it works, instead of setting SPARK_JAR pointing to the hdfs location. This error is caused by not finding ExecutorLauncher in class path, and not HDP specific, I think. Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog mailto:doug.sparku...@dugos.com>> wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mailto:mst...@mathom.us>> wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the "bad substitution" message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta mailto:ume...@groupon.com>> wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog mailto:doug.sparku...@dugos.com>> wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mailto:mst...@mathom.us>> wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the "bad substitution" message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Spark 1.3.0: Running Pi example on YARN fails
Hi Zork, >From the exception, it is still caused by hdp.version not being propagated >correctly. Can you check whether there is any typo? [root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0–2041 [root@c6402 conf]# more spark-defaults.conf spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0–2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0–2041 This is HDP specific question, and you can move the topic to HDP forum. Thanks. Zhan Zhang On Apr 13, 2015, at 3:00 AM, Zork Sail mailto:zorks...@gmail.com>> wrote: Hi Zhan, Alas setting: -Dhdp.version=2.2.0.0–2041 Does not help. Still get the same error: 15/04/13 09:53:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: UNDEFINED tracking URL: http://foo.bar.site:8088/proxy/application_1427875242006_0037/ user: test 15/04/13 09:54:00 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:01 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:02 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:03 INFO yarn.Client: Application report for application_1427875242006_0037 (state: FAILED) 15/04/13 09:54:03 INFO yarn.Client: client token: N/A diagnostics: Application application_1427875242006_0037 failed 2 times due to AM Container for appattempt_1427875242006_0037_02 exited with exitCode: 1 For more detailed output, check application tracking page:http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_1427875242006_0037_02_01 Exit code: 1 Exception message: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution Stack trace: ExitCodeException exitCode=1: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 1 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: FAILED tracking URL: http://foo.bar.site:8088
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog mailto:doug.sparku...@dugos.com>> wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mailto:mst...@mathom.us>> wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the "bad substitution" message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Can't access file in spark, but can in hadoop
Probably guava version conflicts issue. What spark version did you use, and which hadoop version it compile against? Thanks. Zhan Zhang On Mar 27, 2015, at 12:13 PM, Johnson, Dale mailto:daljohn...@ebay.com>> wrote: Yes, I could recompile the hdfs client with more logging, but I don’t have the day or two to spare right this week. One more thing about this, the cluster is Horton Works 2.1.3 [.0] They seem to have a claim of supporting spark on Horton Works 2.2 Dale. From: Ted Yu mailto:yuzhih...@gmail.com>> Date: Thursday, March 26, 2015 at 4:54 PM To: "Johnson, Dale" mailto:daljohn...@ebay.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Can't access file in spark, but can in hadoop Looks like the following assertion failed: Preconditions.checkState(storageIDsCount == locs.size()); locs is List Can you enhance the assertion to log more information ? Cheers On Thu, Mar 26, 2015 at 3:06 PM, Dale Johnson mailto:daljohn...@ebay.com>> wrote: There seems to be a special kind of "corrupted according to Spark" state of file in HDFS. I have isolated a set of files (maybe 1% of all files I need to work with) which are producing the following stack dump when I try to sc.textFile() open them. When I try to open directories, most large directories contain at least one file of this type. Curiously, the following two lines fail inside of a Spark job, but not inside of a Scoobi job: val conf = new org.apache.hadoop.conf.Configuration val fs = org.apache.hadoop.fs.FileSystem.get(conf) The stack trace follows: 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: null) Exception in thread "Driver" java.lang.IllegalStateException at org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743) at org.apache.hadoop.hdfs.DistributedFileSystem$15.(DistributedFileSystem.java:738) at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727) at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662) at org.apache.hadoop.fs.FileSystem$5.(FileSystem.java:1724) at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook It appears to have found the three copies of the given HDFS block, but is performing some sort of validation with them before giving them back to spark to schedule the job. But there is an assert failing. I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error, but I've seen the line numbers change on the HDFS libraries, but not the function names. I've tried recompiling myself with different hadoop versions, and it's the same. We're running hadoop 2.4.1 on our cluster. A google search turns up absolutely nothing on this. Any insight at all would be appreciated. Dale Johnson Applied Researcher eBay.com<http://eBay.com> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251
Re: 2 input paths generate 3 partitions
Hi Rares, The number of partition is controlled by HDFS input format, and one file may have multiple partitions if it consists of multiple block. In you case, I think there is one file with 2 splits. Thanks. Zhan Zhang On Mar 27, 2015, at 3:12 PM, Rares Vernica mailto:rvern...@gmail.com>> wrote: Hello, I am using the Spark shell in Scala on the localhost. I am using sc.textFile to read a directory. The directory looks like this (generated by another Spark script): part-0 part-1 _SUCCESS The part-0 has four short lines of text while part-1 has two short lines of text. The _SUCCESS file is empty. When I check the number of partitions on the RDD I get: scala> foo.partitions.length 15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2 res68: Int = 3 I wonder why do the two input files generate three partitions. Does Spark check the number of lines in each file and try to generate three balanced partitions? Thanks! Rares
Re: RDD.map does not allowed to preservesPartitioning?
Thanks all for the quick response. Thanks. Zhan Zhang On Mar 26, 2015, at 3:14 PM, Patrick Wendell wrote: > I think we have a version of mapPartitions that allows you to tell > Spark the partitioning is preserved: > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639 > > We could also add a map function that does same. Or you can just write > your map using an iterator. > > - Patrick > > On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney wrote: >> This is just a deficiency of the api, imo. I agree: mapValues could >> definitely be a function (K, V)=>V1. The option isn't set by the function, >> it's on the RDD. So you could look at the code and do this. >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >> >> def mapValues[U](f: V => U): RDD[(K, U)] = { >>val cleanF = self.context.clean(f) >>new MapPartitionsRDD[(K, U), (K, V)](self, >> (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, >> preservesPartitioning = true) >> } >> >> What you want: >> >> def mapValues[U](f: (K, V) => U): RDD[(K, U)] = { >>val cleanF = self.context.clean(f) >>new MapPartitionsRDD[(K, U), (K, V)](self, >> (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) }, >> preservesPartitioning = true) >> } >> >> One of the nice things about spark is that making such new operators is very >> easy :) >> >> 2015-03-26 17:54 GMT-04:00 Zhan Zhang : >> >>> Thanks Jonathan. You are right regarding rewrite the example. >>> >>> I mean providing such option to developer so that it is controllable. The >>> example may seems silly, and I don't know the use cases. >>> >>> But for example, if I also want to operate both the key and value part to >>> generate some new value with keeping key part untouched. Then mapValues may >>> not be able to do this. >>> >>> Changing the code to allow this is trivial, but I don't know whether there >>> is some special reason behind this. >>> >>> Thanks. >>> >>> Zhan Zhang >>> >>> >>> >>> >>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney wrote: >>> >>> I believe if you do the following: >>> >>> >>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString >>> >>> (8) MapPartitionsRDD[34] at reduceByKey at :23 [] >>> | MapPartitionsRDD[33] at mapValues at :23 [] >>> | ShuffledRDD[32] at reduceByKey at :23 [] >>> +-(8) MapPartitionsRDD[31] at map at :23 [] >>>| ParallelCollectionRDD[30] at parallelize at :23 [] >>> >>> The difference is that spark has no way to know that your map closure >>> doesn't change the key. if you only use mapValues, it does. Pretty cool that >>> they optimized that :) >>> >>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang : >>>> >>>> Hi Folks, >>>> >>>> Does anybody know what is the reason not allowing preserverPartitioning >>>> in RDD.map? Do I miss something here? >>>> >>>> Following example involves two shuffles. I think if preservePartitioning >>>> is allowed, we can avoid the second one, right? >>>> >>>> val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) >>>> val r2 = r1.map((_, 1)) >>>> val r3 = r2.reduceByKey(_+_) >>>> val r4 = r3.map(x=>(x._1, x._2 + 1)) >>>> val r5 = r4.reduceByKey(_+_) >>>> r5.collect.foreach(println) >>>> >>>> scala> r5.toDebugString >>>> res2: String = >>>> (8) ShuffledRDD[4] at reduceByKey at :29 [] >>>> +-(8) MapPartitionsRDD[3] at map at :27 [] >>>>| ShuffledRDD[2] at reduceByKey at :25 [] >>>>+-(8) MapPartitionsRDD[1] at map at :23 [] >>>> | ParallelCollectionRDD[0] at parallelize at :21 [] >>>> >>>> Thanks. >>>> >>>> Zhan Zhang >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >>> >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don’t know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney mailto:jcove...@gmail.com>> wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at :23 [] | MapPartitionsRDD[33] at mapValues at :23 [] | ShuffledRDD[32] at reduceByKey at :23 [] +-(8) MapPartitionsRDD[31] at map at :23 [] | ParallelCollectionRDD[30] at parallelize at :23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang mailto:zzh...@hortonworks.com>>: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=>(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala> r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at :29 [] +-(8) MapPartitionsRDD[3] at map at :27 [] | ShuffledRDD[2] at reduceByKey at :25 [] +-(8) MapPartitionsRDD[1] at map at :23 [] | ParallelCollectionRDD[0] at parallelize at :21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
RDD.map does not allowed to preservesPartitioning?
Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=>(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala> r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at :29 [] +-(8) MapPartitionsRDD[3] at map at :27 [] | ShuffledRDD[2] at reduceByKey at :25 [] +-(8) MapPartitionsRDD[1] at map at :23 [] | ParallelCollectionRDD[0] at parallelize at :21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM for HiveFromSpark example
You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) mailto:deepuj...@gmail.com>> wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) mailto:deepuj...@gmail.com>> wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157<http://cn122-10.l42scl.hortonworks.com:56157/> with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com<http://cn122-10.l42scl.hortonworks.com/>, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: "@" (64), after : "". 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore 15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty 15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Creating table src position=27 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src 15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_database: default 15/
Re: OOM for HiveFromSpark example
I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) mailto:deepuj...@gmail.com>> wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157<http://cn122-10.l42scl.hortonworks.com:56157/> with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com<http://cn122-10.l42scl.hortonworks.com/>, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: "@" (64), after : "". 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore 15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty 15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Creating table src position=27 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src 15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_database: default 15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang ip=unknown-ip-addr cmd=get_database: default 15/01/13 18:00:03 INFO ql.Driver: Semantic Analysis Completed 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogge
Re: Spark-thriftserver Issue
You can try to set it in spark-env.sh. # - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs) # - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp) Thanks. Zhan Zhang On Mar 24, 2015, at 12:10 PM, Anubhav Agarwal mailto:anubha...@gmail.com>> wrote: Zhan specifying port fixed the port issue. Is it possible to specify the log directory while starting the spark thriftserver? Still getting this error even through the folder exists and everyone has permission to use that directory. drwxr-xr-x 2 root root 4096 Mar 24 19:04 spark-events Exception in thread "main" java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99) at org.apache.spark.SparkContext.(SparkContext.scala:399) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Probably the port is already used by others, e.g., hive. You can change the port similar to below ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf hive.server2.thrift.port=10001 Thanks. Zhan Zhang On Mar 23, 2015, at 12:01 PM, Neil Dev mailto:neilk...@gmail.com>> wrote: Hi, I am having issue starting spark-thriftserver. I'm running spark 1.3.with Hadoop 2.4.0. I would like to be able to change its port too so, I can hive hive-thriftserver as well as spark-thriftserver running at the same time. Starting sparkthrift server:- sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077 --executor-memory 2G Error:- I created the folder manually but still getting the following error Exception in thread "main" java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. I am getting the following error 15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error: org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address0.0.0.0/0.0.0.0:1<http://0.0.0.0:1/>. at org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93) at org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79) at org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236) at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69) at java.lang.Thread.run(Thread.java:745) Thanks Neil
Re: Spark-thriftserver Issue
Probably the port is already used by others, e.g., hive. You can change the port similar to below ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf hive.server2.thrift.port=10001 Thanks. Zhan Zhang On Mar 23, 2015, at 12:01 PM, Neil Dev mailto:neilk...@gmail.com>> wrote: Hi, I am having issue starting spark-thriftserver. I'm running spark 1.3.with Hadoop 2.4.0. I would like to be able to change its port too so, I can hive hive-thriftserver as well as spark-thriftserver running at the same time. Starting sparkthrift server:- sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077 --executor-memory 2G Error:- I created the folder manually but still getting the following error Exception in thread "main" java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. I am getting the following error 15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error: org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address0.0.0.0/0.0.0.0:1. at org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93) at org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79) at org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236) at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69) at java.lang.Thread.run(Thread.java:745) Thanks Neil
Re: Spark Job History Server
Hi Patcharee, It is an alpha feature in HDP distribution, integrating ATS with Spark history server. If you are using upstream, you can configure spark as regular without these configuration. But other related configuration are still mandatory, such as hdp.version related. Thanks. Zhan Zhang On Mar 18, 2015, at 3:30 AM, patcharee wrote: > Hi, > > I am using spark 1.3. I would like to use Spark Job History Server. I added > the following line into conf/spark-defaults.conf > > spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService > spark.history.provider > org.apache.spark.deploy.yarn.history.YarnHistoryProvider > spark.yarn.historyServer.address sandbox.hortonworks.com:19888 > > But got Exception in thread "main" java.lang.ClassNotFoundException: > org.apache.spark.deploy.yarn.history.YarnHistoryProvider > > What class is really needed? How to fix it? > > Br, > Patcharee > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saving Dstream into a single file
Each RDD has multiple partitions, each of them will produce one hdfs file when saving output. I don’t think you are allowed to have multiple file handler writing to the same hdfs file. You still can load multiple files into hive tables, right? Thanks.. Zhan Zhang On Mar 15, 2015, at 7:31 AM, tarek_abouzeid wrote: > i am doing word count example on flume stream and trying to save output as > text files in HDFS , but in the save directory i got multiple sub > directories each having files with small size , i wonder if there is a way > to append in a large file instead of saving in multiple files , as i intend > to save the output in hive hdfs directory so i can query the result using > hive > > hope anyone have a workaround for this issue , Thanks in advance > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LogisticRegressionWithLBFGS shows ERRORs
It is during function evaluation in the line search, the value is either infinite or NaN, which may be caused too large step size. In the code, the step is reduced to half. Thanks. Zhan Zhang On Mar 13, 2015, at 2:41 PM, cjwang wrote: > I am running LogisticRegressionWithLBFGS. I got these lines on my console: > > 2015-03-12 17:38:03,897 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to 0.5 > 2015-03-12 17:38:03,967 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to 0.25 > 2015-03-12 17:38:04,036 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to 0.125 > 2015-03-12 17:38:04,105 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to > 0.0625 > 2015-03-12 17:38:04,176 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to > 0.03125 > 2015-03-12 17:38:04,247 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to > 0.015625 > 2015-03-12 17:38:04,317 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to > 0.0078125 > 2015-03-12 17:38:04,461 ERROR breeze.optimize.StrongWolfeLineSearch | > Encountered bad values in function evaluation. Decreasing step size to > 0.005859375 > 2015-03-12 17:38:04,605 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:04,672 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:04,747 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:04,818 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:04,890 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:04,962 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:05,038 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:05,107 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:05,186 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:05,256 INFO breeze.optimize.StrongWolfeLineSearch | Line > search t: NaN fval: NaN rhs: NaN cdd: NaN > 2015-03-12 17:38:05,257 ERROR breeze.optimize.LBFGS | Failure! Resetting > history: breeze.optimize.FirstOrderException: Line search zoom failed > > > What causes them and how do I fix them? > > I checked my data and there seemed nothing out of the ordinary. The > resulting prediction model seemed acceptable to me. So, are these ERRORs > actually WARNINGs? Could we or should we tune the level of these messages > down one notch? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/LogisticRegressionWithLBFGS-shows-ERRORs-tp22042.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Process time series RDD after sortByKey
Does the code flow similar to following work for you, which processes each partition of an RDD sequentially? while( iterPartition < RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) => somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } You can refer RDD.take for example. Thanks. Zhan Zhang On Mar 9, 2015, at 3:41 PM, Shuai Zheng mailto:szheng.c...@gmail.com>> wrote: Hi All, I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data. I need to sort the data before I start process. Assume I can sort them successfully dayRDD.sortByKey but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can’t just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index? Anyway to do it? Hope I describe my issue clear… :) Regards, Shuai
Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
You are using 1.2.1 right? If so, please add java-opts in conf directory and give it a try. [root@c6401 conf]# more java-opts -Dhdp.version=2.2.2.0-2041 Thanks. Zhan Zhang On Mar 6, 2015, at 11:35 AM, Todd Nist mailto:tsind...@gmail.com>> wrote: -Dhdp.version=2.2.0.0-2041
Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Sorry. Misunderstanding. Looks like it already worked. If you still met some hdp.version problem, you can try it :) Thanks. Zhan Zhang On Mar 6, 2015, at 11:40 AM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: You are using 1.2.1 right? If so, please add java-opts in conf directory and give it a try. [root@c6401 conf]# more java-opts -Dhdp.version=2.2.2.0-2041 Thanks. Zhan Zhang On Mar 6, 2015, at 11:35 AM, Todd Nist mailto:tsind...@gmail.com>> wrote: -Dhdp.version=2.2.0.0-2041
Re: [SPARK-SQL] How to pass parameter when running hql script using cli?
Do you mean “--hiveConf” (two dash) , instead of -hiveconf (one dash) Thanks. Zhan Zhang On Mar 6, 2015, at 4:20 AM, James wrote: > Hello, > > I want to execute a hql script through `spark-sql` command, my script > contains: > > ``` > ALTER TABLE xxx > DROP PARTITION (date_key = ${hiveconf:CUR_DATE}); > ``` > > when I execute > > ``` > spark-sql -f script.hql -hiveconf CUR_DATE=20150119 > ``` > > It throws an error like > ``` > cannot recognize input near '$' '{' 'hiveconf' in constant > ``` > > I have try on hive and it works. Thus how could I pass a parameter like date > to a hql script? > > Alcaid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Hi Todd, Looks like the thrift server can connect to metastore, but something wrong in the executors. You can try to get the log with "yarn logs -applicationID xxx” to check why it failed. If there is no log (master or executor is not started at all), you can go to the RM webpage, click the link to see why the shell failed in the first place. Thanks. Zhan Zhang On Mar 6, 2015, at 9:59 AM, Todd Nist mailto:tsind...@gmail.com>> wrote: First, thanks to everyone for their assistance and recommendations. @Marcelo I applied the patch that you recommended and am now able to get into the shell, thank you worked great after I realized that the pom was pointing to the 1.3.0-SNAPSHOT for parent, need to bump that down to 1.2.1. @Zhan Need to apply this patch next. I tried to start the spark-thriftserver but and it starts, then fails with like this: I have the entries in my spark-default.conf, but not the patch applied. ./sbin/start-thriftserver.sh --master yarn --executor-memory 1024m --hiveconf hive.server2.thrift.port=10001 5/03/06 12:34:17 INFO ui.SparkUI: Started SparkUI at http://hadoopdev01<http://hadoopdev01/>.opsdatastore.com:4040 15/03/06 12:34:18 INFO impl.TimelineClientImpl: Timeline service address: http://hadoopdev02<http://hadoopdev02/>.opsdatastore.com:8188/ws/v1/timeline/ 15/03/06 12:34:18 INFO client.RMProxy: Connecting to ResourceManager at hadoopdev02.opsdatastore.com/192.168.15.154:8050 15/03/06 12:34:18 INFO yarn.Client: Requesting a new application from cluster with 4 NodeManagers 15/03/06 12:34:18 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 15/03/06 12:34:18 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/03/06 12:34:18 INFO yarn.Client: Setting up container launch context for our AM 15/03/06 12:34:18 INFO yarn.Client: Preparing resources for our AM container 15/03/06 12:34:19 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/03/06 12:34:19 INFO yarn.Client: Uploading resource file:/root/spark-1.2.1-bin-hadoop2.6/lib/spark-assembly-1.2.1-hadoop2.6.0.jar -> hdfs://hadoopdev01.opsdatastore.com:8020/user/root/.sparkStaging/application_1425078697953_0018/spark-assembly-1.2.1-hadoop2.6.0.jar 15/03/06 12:34:21 INFO yarn.Client: Setting up the launch environment for our AM container 15/03/06 12:34:21 INFO spark.SecurityManager: Changing view acls to: root 15/03/06 12:34:21 INFO spark.SecurityManager: Changing modify acls to: root 15/03/06 12:34:21 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/06 12:34:21 INFO yarn.Client: Submitting application 18 to ResourceManager 15/03/06 12:34:21 INFO impl.YarnClientImpl: Submitted application application_1425078697953_0018 15/03/06 12:34:22 INFO yarn.Client: Application report for application_1425078697953_0018 (state: ACCEPTED) 15/03/06 12:34:22 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1425663261755 final status: UNDEFINED tracking URL: http://hadoopdev02<http://hadoopdev02/>.opsdatastore.com:8088/proxy/application_1425078697953_0018/ user: root 15/03/06 12:34:23 INFO yarn.Client: Application report for application_1425078697953_0018 (state: ACCEPTED) 15/03/06 12:34:24 INFO yarn.Client: Application report for application_1425078697953_0018 (state: ACCEPTED) 15/03/06 12:34:25 INFO yarn.Client: Application report for application_1425078697953_0018 (state: ACCEPTED) 15/03/06 12:34:26 INFO yarn.Client: Application report for application_1425078697953_0018 (state: ACCEPTED) 15/03/06 12:34:27 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkyar...@hadoopdev08.opsdatastore.com:40201/user/YarnAM#-557112763] 15/03/06 12:34:27 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoopdev02.opsdatastore.com, PROXY_URI_BASES -> http://hadoopdev02<http://hadoopdev02/>.opsdatastore.com:8088/proxy/application_1425078697953_0018), /proxy/application_1425078697953_0018 15/03/06 12:34:27 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/03/06 12:34:27 INFO yarn.Client: Application report for application_1425078697953_0018 (state: RUNNING) 15/03/06 12:34:27 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: hadoopdev08.opsdatastore.com ApplicationMaster RPC port: 0 queue: default start time: 1425663261755 final status: UNDEFINED tracking URL: http://hadoo
Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
In addition, you may need following patch if it is not in 1.2.1 to solve some system property issue if you use HDP 2.2. https://github.com/apache/spark/pull/3409 You can follow the following link to set hdp.version for java options. http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ Thanks. Zhan Zhang On Mar 5, 2015, at 11:09 AM, Marcelo Vanzin mailto:van...@cloudera.com>> wrote: It seems from the excerpt below that your cluster is set up to use the Yarn ATS, and the code is failing in that path. I think you'll need to apply the following patch to your Spark sources if you want this to work: https://github.com/apache/spark/pull/3938 On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist wrote: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:166) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:65) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.(SparkContext.scala:348) -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TreeNodeException: Unresolved attributes
Which spark version did you use? I tried spark-1.2.1 and didn’t meet this problem. scala> val m = hiveContext.sql(" select * from testtable where value like '%Restaurant%'") 15/03/05 02:02:30 INFO ParseDriver: Parsing command: select * from testtable where value like '%Restaurant%' 15/03/05 02:02:30 INFO ParseDriver: Parse Completed 15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(462299) called with curMem=1087888, maxMem=280248975 15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 451.5 KB, free 265.8 MB) 15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(81645) called with curMem=1550187, maxMem=280248975 15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 79.7 KB, free 265.7 MB) 15/03/05 02:02:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on c6402.ambari.apache.org<http://c6402.ambari.apache.org>:33696 (size: 79.7 KB, free: 267.0 MB) 15/03/05 02:02:30 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/05 02:02:30 INFO DefaultExecutionContext: Created broadcast 2 from broadcast at TableReader.scala:68 m: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Filter Contains(value#5, Restaurant) HiveTableScan [key#4,value#5], (MetastoreRelation default, testtable, None), None scala> Thanks. Zhan Zhang On Mar 4, 2015, at 9:09 AM, Anusha Shamanur mailto:anushas...@gmail.com>> wrote: I tried. I still get the same error. 15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from TableName where value like '%Restaurant%' 15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed. 15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=TableName 15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr cmd=get_table : db=default tbl=TableName results: org.apache.spark.sql.SchemaRDD = SchemaRDD[86] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: 'Project [*] 'Filter ('value LIKE Restaurant) MetastoreRelation default, TableName, None On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda mailto:ar...@sigmoidanalytics.com>> wrote: Why don't you formulate a string before you pass it to the hql function (appending strings), and hql function is deprecated. You should use sql. http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur mailto:anushas...@gmail.com>> wrote: Hi, I am trying to run a simple select query on a table. val restaurants=hiveCtx.hql("select * from TableName where column like '%SomeString%' ") This gives an error as below: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: How do I solve this? -- Regards, Anusha -- [Sigmoid Analytics]<http://htmlsig.com/www.sigmoidanalytics.com> Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com<mailto:ar...@sigmoidanalytics.com> || www.sigmoidanalytics.com<http://www.sigmoidanalytics.com/> -- Regards, Anusha
Re: RDD coalesce or repartition by #records or #bytes?
It use HashPartitioner to distribute the record to different partitions, but the key is just integer evenly across output partitions. >From the code, each resulting partition will get very similar number of >records. Thanks. Zhan Zhang On Mar 4, 2015, at 3:47 PM, Du Li mailto:l...@yahoo-inc.com.INVALID>> wrote: Hi, My RDD's are created from kafka stream. After receiving a RDD, I want to do coalesce/repartition it so that the data will be processed in a set of machines in parallel as even as possible. The number of processing nodes is larger than the receiving nodes. My question is how the coalesce/repartition works. Does it distribute by the number of records or number of bytes? In my app, my observation is that the distribution seems by number of records. The consequence is, however, some executors have to process x1000 as much as data when the sizes of records are very skewed. Then we have to allocate memory by the worst case. Is there a way to programmatically affect the coalesce /repartition scheme? Thanks, Du
Re: Issue with yarn cluster - hangs in accepted state.
Do you have enough resource in your cluster? You can check your resource manager to see the usage. Thanks. Zhan Zhang On Mar 3, 2015, at 8:51 AM, abhi mailto:abhishek...@gmail.com>> wrote: I am trying to run below java class with yarn cluster, but it hangs in accepted state . i don't see any error . Below is the class and command . Any help is appreciated . Thanks, Abhi bin/spark-submit --class com.mycompany.app.SimpleApp --master yarn-cluster /home/hduser/my-app-1.0.jar {code} public class SimpleApp { public static void main(String[] args) { String logFile = "/home/hduser/testspark.txt"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); } } {code} 15/03/03 11:47:40 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:41 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:42 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:43 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:44 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:45 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:46 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:47 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:48 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:49 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:50 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:51 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:52 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:53 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:54 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:55 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:56 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:57 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:58 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:59 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:00 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:01 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:02 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:03 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:04 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED
Re: Resource manager UI for Spark applications
In Yarn (Cluster or client), you can access the spark ui when the app is running. After app is done, you can still access it, but need some extra setup for history server. Thanks. Zhan Zhang On Mar 3, 2015, at 10:08 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: bq. changing the address with internal to the external one , but still does not work. Not sure what happened. For the time being, you can use yarn command line to pull container log (put in your appId and container Id): yarn logs -applicationId application_1386639398517_0007 -containerId container_1386639398517_0007_01_19 Cheers On Tue, Mar 3, 2015 at 9:50 AM, roni mailto:roni.epi...@gmail.com>> wrote: Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at ip-172-31-43-116.us<http://ip-172-31-43-116.us>-west-2.compute.internal can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi mailto:roni.epi...@gmail.com>> wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi mailto:roni.epi...@gmail.com>> wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Re: How to tell if one RDD depends on another
Currently in spark, it looks like there is no easy way to know the dependencies. It is solved at run time. Thanks. Zhan Zhang On Feb 26, 2015, at 4:20 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: Ted. That one I know. It was the dependency part I was curious about On Feb 26, 2015 7:12 PM, "Ted Yu" mailto:yuzhih...@gmail.com>> wrote: bq. whether or not rdd1 is a cached rdd RDD has getStorageLevel method which would return the RDD's current storage level. SparkContext has this method: * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ @DeveloperApi def getRDDStorageInfo: Array[RDDInfo] = { Cheers On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: Zhan, This is exactly what I'm trying to do except, as I metnioned in my first message, I am being given rdd1 and rdd2 only and I don't necessarily know at that point whether or not rdd1 is a cached rdd. Further, I don't know at that point whether or not rdd2 depends on rdd1. On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: In this case, it is slow to wait for rdd1.saveAsHasoopFile(...) to finish probably due to writing to hdfs. a walk around for this particular case may be as follows. val rdd1 = ..cache() val rdd2 = rdd1.map().() rdd1.count future { rdd1.saveAsHasoopFile(...) } future { rdd2.saveAsHadoopFile(…)] In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen concurrently. Thanks. Zhan Zhang On Feb 26, 2015, at 3:28 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > What confused me is the statement of "The final result is that rdd1 is > calculated twice.” Is it the expected behavior? To be perfectly honest, performing an action on a cached RDD in two different threads and having them (at the partition level) block until the parent are cached would be the behavior and myself and all my coworkers expected. On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: I should probably mention that my example case is much over simplified- Let's say I've got a tree, a fairly complex one where I begin a series of jobs at the root which calculates a bunch of really really complex joins and as I move down the tree, I'm creating reports from the data that's already been joined (i've implemented logic to determine when cached items can be cleaned up, e.g. the last report has been done in a subtree). My issue is that the 'actions' on the rdds are currently being implemented in a single thread- even if I'm waiting on a cache to complete fully before I run the "children" jobs, I'm still in a better placed than I was because I'm able to run those jobs concurrently- right now this is not the case. > What you want is for a request for partition X to wait if partition X is > already being calculated in a persisted RDD. I totally agree and if I could get it so that it's waiting at the granularity of the partition, I'd be in a much much better place. I feel like I'm going down a rabbit hole and working against the Spark API. On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen mailto:so...@cloudera.com>> wrote: To distill this a bit further, I don't think you actually want rdd2 to wait on rdd1 in this case. What you want is for a request for partition X to wait if partition X is already being calculated in a persisted RDD. Otherwise the first partition of rdd2 waits on the final partition of rdd1 even when the rest is ready. That is probably usually a good idea in almost all cases. That much, I don't know how hard it is to implement. But I speculate that it's easier to deal with it at that level than as a function of the dependency graph. On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > I'm trying to do the scheduling myself now- to determine that rdd2 depends > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can > do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure > this out so I don't need to think about all this.
Re: How to tell if one RDD depends on another
I miss that part. Thanks for the explanation. It is a challenging problem implementation wise. To do it programmatically, 1. pre-analyze all DAGs to form a complete DAG with root as the source, and leaf as all actions. 2. Any RDD(node) that has more than one downstream nodes needs to be marked as cached. 3. After each action is done, remove it from the graph and clean up the nodes that does not have downstream nodes. Implementation: 1. Needs to construct the graph at every RDD transformation (internal node and edge) and actions (leaf). 2. In each runJob, identify the actions and remove it from the graph, and clean up the cache. Take yours as the example, the graph is construct as below: RDD1——>output | |_RDD2___output Thanks. Zhan Zhang On Feb 26, 2015, at 4:20 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: Ted. That one I know. It was the dependency part I was curious about On Feb 26, 2015 7:12 PM, "Ted Yu" mailto:yuzhih...@gmail.com>> wrote: bq. whether or not rdd1 is a cached rdd RDD has getStorageLevel method which would return the RDD's current storage level. SparkContext has this method: * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ @DeveloperApi def getRDDStorageInfo: Array[RDDInfo] = { Cheers On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: Zhan, This is exactly what I'm trying to do except, as I metnioned in my first message, I am being given rdd1 and rdd2 only and I don't necessarily know at that point whether or not rdd1 is a cached rdd. Further, I don't know at that point whether or not rdd2 depends on rdd1. On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: In this case, it is slow to wait for rdd1.saveAsHasoopFile(...) to finish probably due to writing to hdfs. a walk around for this particular case may be as follows. val rdd1 = ..cache() val rdd2 = rdd1.map().() rdd1.count future { rdd1.saveAsHasoopFile(...) } future { rdd2.saveAsHadoopFile(…)] In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen concurrently. Thanks. Zhan Zhang On Feb 26, 2015, at 3:28 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > What confused me is the statement of "The final result is that rdd1 is > calculated twice.” Is it the expected behavior? To be perfectly honest, performing an action on a cached RDD in two different threads and having them (at the partition level) block until the parent are cached would be the behavior and myself and all my coworkers expected. On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: I should probably mention that my example case is much over simplified- Let's say I've got a tree, a fairly complex one where I begin a series of jobs at the root which calculates a bunch of really really complex joins and as I move down the tree, I'm creating reports from the data that's already been joined (i've implemented logic to determine when cached items can be cleaned up, e.g. the last report has been done in a subtree). My issue is that the 'actions' on the rdds are currently being implemented in a single thread- even if I'm waiting on a cache to complete fully before I run the "children" jobs, I'm still in a better placed than I was because I'm able to run those jobs concurrently- right now this is not the case. > What you want is for a request for partition X to wait if partition X is > already being calculated in a persisted RDD. I totally agree and if I could get it so that it's waiting at the granularity of the partition, I'd be in a much much better place. I feel like I'm going down a rabbit hole and working against the Spark API. On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen mailto:so...@cloudera.com>> wrote: To distill this a bit further, I don't think you actually want rdd2 to wait on rdd1 in this case. What you want is for a request for partition X to wait if partition X is already being calculated in a persisted RDD. Otherwise the first partition of rdd2 waits on the final partition of rdd1 even when the rest is ready. That is probably usually a good idea in almost all cases. That much, I don't know how hard it is to implement. But I speculate that it's easier to deal with it at that level than as a function of the dependency graph. On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > I'm trying to do the scheduling myself now- to determine that rdd2 depends > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can > do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure > this out so I don't need to think about all this.
Re: How to tell if one RDD depends on another
In this case, it is slow to wait for rdd1.saveAsHasoopFile(...) to finish probably due to writing to hdfs. a walk around for this particular case may be as follows. val rdd1 = ..cache() val rdd2 = rdd1.map().() rdd1.count future { rdd1.saveAsHasoopFile(...) } future { rdd2.saveAsHadoopFile(…)] In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen concurrently. Thanks. Zhan Zhang On Feb 26, 2015, at 3:28 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > What confused me is the statement of "The final result is that rdd1 is > calculated twice.” Is it the expected behavior? To be perfectly honest, performing an action on a cached RDD in two different threads and having them (at the partition level) block until the parent are cached would be the behavior and myself and all my coworkers expected. On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: I should probably mention that my example case is much over simplified- Let's say I've got a tree, a fairly complex one where I begin a series of jobs at the root which calculates a bunch of really really complex joins and as I move down the tree, I'm creating reports from the data that's already been joined (i've implemented logic to determine when cached items can be cleaned up, e.g. the last report has been done in a subtree). My issue is that the 'actions' on the rdds are currently being implemented in a single thread- even if I'm waiting on a cache to complete fully before I run the "children" jobs, I'm still in a better placed than I was because I'm able to run those jobs concurrently- right now this is not the case. > What you want is for a request for partition X to wait if partition X is > already being calculated in a persisted RDD. I totally agree and if I could get it so that it's waiting at the granularity of the partition, I'd be in a much much better place. I feel like I'm going down a rabbit hole and working against the Spark API. On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen mailto:so...@cloudera.com>> wrote: To distill this a bit further, I don't think you actually want rdd2 to wait on rdd1 in this case. What you want is for a request for partition X to wait if partition X is already being calculated in a persisted RDD. Otherwise the first partition of rdd2 waits on the final partition of rdd1 even when the rest is ready. That is probably usually a good idea in almost all cases. That much, I don't know how hard it is to implement. But I speculate that it's easier to deal with it at that level than as a function of the dependency graph. On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: > I'm trying to do the scheduling myself now- to determine that rdd2 depends > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can > do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure > this out so I don't need to think about all this.
Re: How to tell if one RDD depends on another
What confused me is the statement of "The final result is that rdd1 is calculated twice.” Is it the expected behavior? Thanks. Zhan Zhang On Feb 26, 2015, at 3:03 PM, Sean Owen mailto:so...@cloudera.com>> wrote: To distill this a bit further, I don't think you actually want rdd2 to wait on rdd1 in this case. What you want is for a request for partition X to wait if partition X is already being calculated in a persisted RDD. Otherwise the first partition of rdd2 waits on the final partition of rdd1 even when the rest is ready. That is probably usually a good idea in almost all cases. That much, I don't know how hard it is to implement. But I speculate that it's easier to deal with it at that level than as a function of the dependency graph. On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet mailto:cjno...@gmail.com>> wrote: I'm trying to do the scheduling myself now- to determine that rdd2 depends on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure this out so I don't need to think about all this.
Re: Running spark function on parquet without sql
When you use sql (or API from SchemaRDD/DataFrame) to read data form parquet, the optimizer will do column pruning, predictor pushdown, etc. Thus you can the benefit of parquet column benefits. After that, you can operate the SchemaRDD (DF) like regular RDD. Thanks. Zhan Zhang On Feb 26, 2015, at 1:50 PM, tridib wrote: > Hello Experts, > In one of my projects we are having parquet files and we are using spark SQL > to get our analytics. I am encountering situation where simple SQL is not > getting me what I need or the complex SQL is not supported by Spark Sql. In > scenarios like this I am able to get things done using low level spark > constructs like MapFunction and reducers. > > My question is if I create a JavaSchemaRdd on Parquet and use basic spark > constructs, will I still get the benefit of parquets columnar format? Will > my aggregation be as fast as it would have been if I have used SQL? > > Please advice. > > Thanks & Regards > Tridib > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-function-on-parquet-without-sql-tp21833.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to tell if one RDD depends on another
You don’t need to know rdd dependencies to maximize dependencies. Internally the scheduler will construct the DAG and trigger the execution if there is no shuffle dependencies in between RDDs. Thanks. Zhan Zhang On Feb 26, 2015, at 1:28 PM, Corey Nolet wrote: > Let's say I'm given 2 RDDs and told to store them in a sequence file and they > have the following dependency: > > val rdd1 = sparkContext.sequenceFile().cache() > val rdd2 = rdd1.map() > > > How would I tell programmatically without being the one who built rdd1 and > rdd2 whether or not rdd2 depends on rdd1? > > I'm working on a concurrency model for my application and I won't necessarily > know how the two rdds are constructed. What I will know is whether or not > rdd1 is cached but i want to maximum concurrency and run rdd1 and rdd2 > together if rdd2 does not depend on rdd1. > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help me understand the partition, parallelism in Spark
Here is my understanding. When running on top of yarn, the cores means the number of tasks can run in one executor. But all these cores are located in the same JVM. Parallelism typically control the balance of tasks. For example, if you have 200 cores, but only 50 partitions. There will be 150 cores sitting idle. OOM: increase the memory size, and JVM memory overhead may help here. Thanks. Zhan Zhang On Feb 26, 2015, at 2:03 PM, Yana Kadiyska mailto:yana.kadiy...@gmail.com>> wrote: Imran, I have also observed the phenomenon of reducing the cores helping with OOM. I wanted to ask this (hopefully without straying off topic): we can specify the number of cores and the executor memory. But we don't get to specify _how_ the cores are spread among executors. Is it possible that with 24G memory and 4 cores we get a spread of 1 core per executor thus ending up with 24G for the task, but with 24G memory and 10 cores some executor ends up with 3 cores on the same machine and thus we have only 8G per task? On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid mailto:iras...@cloudera.com>> wrote: Hi Yong, mostly correct except for: * Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. no, you will not get 1000 partitions. Spark has to decide how many partitions to use before it even knows how many unique keys there are. If you have 200 as the default parallelism (or you just explicitly make it the second parameter to reduceByKey()), then you will get 200 partitions. The 1000 unique keys will be distributed across the 200 partitions. ideally they will be distributed pretty equally, but how they get distributed depends on the partitioner (by default you will have a HashPartitioner, so it depends on the hash of your keys). Note that this is more or less the same as in Hadoop MapReduce. the amount of parallelism matters b/c there are various places in spark where there is some overhead proportional to the size of a partition. So in your example, if you have 1000 unique keys in 200 partitions, you expect about 5 unique keys per partitions -- if instead you had 10 partitions, you'd expect 100 unique keys per partitions, and thus more data and you'd be more likely to hit an OOM. But there are many other possible sources of OOM, so this is definitely not the *only* solution. Sorry I can't comment in particular about Spark SQL -- hopefully somebody more knowledgeable can comment on that. On Wed, Feb 25, 2015 at 8:58 PM, java8964 mailto:java8...@hotmail.com>> wrote: Hi, Sparkers: I come from the Hadoop MapReducer world, and try to understand some internal information of spark. From the web and this list, I keep seeing people talking about increase the parallelism if you get the OOM error. I tried to read document as much as possible to understand the RDD partition, and parallelism usage in the spark. I understand that for RDD from HDFS, by default, one partition will be one HDFS block, pretty straightforward. I saw that lots of RDD operations support 2nd parameter of parallelism. This is the part confuse me. From my understand, the parallelism is totally controlled by how many cores you give to your job. Adjust that parameter, or "spark.default.parallelism" shouldn't have any impact. For example, if I have a 10G data in HDFS, and assume the block size is 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 200 as the default parallelism. Here is what I assume: * We have 100 partitions, as the data comes from 100 blocks. Most likely the spark will generate 100 tasks to read and shuffle them? * The 1000 unique keys mean the 1000 reducer group, like in MR * If I set the max core to be 50, so there will be up to 50 tasks can be run concurrently. The rest tasks just have to wait for the core, if there are 50 tasks are running. * Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. * I don't know these 1000 partitions will be processed by how many tasks, maybe this is the parallelism parameter comes in? * No matter what parallelism this will be, there are ONLY 50 task can be run concurrently. So if we set more cores, more partitions' data will be processed in the executor (which runs more thread in this case), so more memory needs. I don't see how increasing parallelism could help the OOM in this case. * In my test case of Spark SQL, I gave 24G as the executor heap, my join between 2 big datasets keeps getting OOM. I keep increasing the "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no help. What really makes the query finish finally without OOM is after I change the "--t
Re: NullPointerException in ApplicationMaster
Look at the trace again. It is a very weird error. The SparkSubmit is running on client side, but YarnClusterSchedulerBackend is supposed in running in YARN AM. I suspect you are running the cluster with yarn-client mode, but in JavaSparkContext you set "yarn-cluster”. As a result, spark context initiate YarnClusterSchedulerBackend instead of YarnClientSchedulerBackend, which I think is the root cause. Thanks. Zhan Zhang On Feb 25, 2015, at 1:53 PM, Zhan Zhang mailto:zzh...@hortonworks.com>> wrote: Hi Mate, When you initialize the JavaSparkContext, you don’t need to specify the mode “yarn-cluster”. I suspect that is the root cause. Thanks. Zhan Zhang On Feb 25, 2015, at 10:12 AM, gulyasm mailto:mgulya...@gmail.com>> wrote: JavaSparkContext.
Re: NullPointerException in ApplicationMaster
Hi Mate, When you initialize the JavaSparkContext, you don’t need to specify the mode “yarn-cluster”. I suspect that is the root cause. Thanks. Zhan Zhang On Feb 25, 2015, at 10:12 AM, gulyasm mailto:mgulya...@gmail.com>> wrote: JavaSparkContext.
Re: Can't access remote Hive table from spark
When you log in, you have root access. Then you can do “su hdfs” or any other account. Then you can create hdfs directory and change permission, etc. Thanks Zhan Zhang On Feb 11, 2015, at 11:28 PM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan, Yes, I found there is a hdfs account, which is created by Ambari, but what's the password for this account, how can I login under this account? Can I just change the password for the hdfs account? Regards, -- Original -- From: "Zhan Zhang";mailto:zzh...@hortonworks.com>>; Send time: Thursday, Feb 12, 2015 2:00 AM To: ""mailto:guxiaobo1...@qq.com>>; Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"mailto:user@spark.apache.org>>; "Cheng Lian"mailto:lian.cs@gmail.com>>; Subject: Re: Can't access remote Hive table from spark You need to have right hdfs account, e.g., hdfs, to create directory and assign permission. Thanks. Zhan Zhang On Feb 11, 2015, at 4:34 AM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan, My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu and root [xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=xiaobogu, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=root, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x I notice there is a hdfs account created by ambari, but what's password for it, should I user the hdfs account to create the directory? -- Original -- From: "Zhan Zhang";mailto:zzh...@hortonworks.com>>; Send time: Sunday, Feb 8, 2015 4:11 AM To: ""mailto:guxiaobo1...@qq.com>>; Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"mailto:user@spark.apache.org>>; "Cheng Lian"mailto:lian.cs....@gmail.com>>; Subject: Re: Can't access remote Hive table from spark Yes. You need to create xiaobogu under /user and provide right permission to xiaobogu. Thanks. Zhan Zhang On Feb 7, 2015, at 8:15 AM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050<http://lix1.bh.com/192.168.100.3:8050> 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesyst
Re: Can't access remote Hive table from spark
You need to have right hdfs account, e.g., hdfs, to create directory and assign permission. Thanks. Zhan Zhang On Feb 11, 2015, at 4:34 AM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan, My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu and root [xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=xiaobogu, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=root, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x I notice there is a hdfs account created by ambari, but what's password for it, should I user the hdfs account to create the directory? -- Original -- From: "Zhan Zhang";mailto:zzh...@hortonworks.com>>; Send time: Sunday, Feb 8, 2015 4:11 AM To: ""mailto:guxiaobo1...@qq.com>>; Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"mailto:user@spark.apache.org>>; "Cheng Lian"mailto:lian.cs@gmail.com>>; Subject: Re: Can't access remote Hive table from spark Yes. You need to create xiaobogu under /user and provide right permission to xiaobogu. Thanks. Zhan Zhang On Feb 7, 2015, at 8:15 AM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050<http://lix1.bh.com/192.168.100.3:8050> 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(S
Re: Can't access remote Hive table from spark
Yes. You need to create xiaobogu under /user and provide right permission to xiaobogu. Thanks. Zhan Zhang On Feb 7, 2015, at 8:15 AM, guxiaobo1982 mailto:guxiaobo1...@qq.com>> wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050<http://lix1.bh.com/192.168.100.3:8050> 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode="/user":hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2555) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2524) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:823) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:823) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:816) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1815) at org.apache.hadoop.fs.FileSystem
Re: Can't access remote Hive table from spark
Not sure spark standalone mode. But on spark-on-yarn, it should work. You can check following link: http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ Thanks. Zhan Zhang On Feb 5, 2015, at 5:02 PM, Cheng Lian mailto:lian.cs@gmail.com>> wrote: Please note that Spark 1.2.0 only support Hive 0.13.1 or 0.12.0, none of other versions are supported. Best, Cheng On 1/25/15 12:18 AM, guxiaobo1982 wrote: Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= "This is a test application"; String master="spark://lix1.bh.com:7077"; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); //sqlCtx.sql("LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. List rows = sqlCtx.sql("FROM src SELECT key, value").collect(); System.out.print("I got " + rows.size() + " rows \r\n"); sc.close();} } Exception in thread "main" org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$anon$2.org<http://2.org>$apache$spark$sql$catalyst$analysis$OverrideCatalog$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala
Re: Spark impersonation
I think you can configure hadoop/hive to do impersonation. There is no difference between secure or insecure hadoop cluster by using kinit. Thanks. Zhan Zhang On Feb 2, 2015, at 9:32 PM, Koert Kuipers mailto:ko...@tresata.com>> wrote: yes jobs run as the user that launched them. if you want to run jobs on a secure cluster then use yarn. hadoop standalone does not support secure hadoop. On Mon, Feb 2, 2015 at 5:37 PM, Jim Green mailto:openkbi...@gmail.com>> wrote: Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.info<http://www.openkb.info/> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0
I think it is expected. Refer to the comments in saveAsTable "Note that this currently only works with SchemaRDDs that are created from a HiveContext”. If I understand correctly, here the SchemaRDD means those generated by HiveContext.sql, instead of applySchema. Thanks. Zhan Zhang On Jan 29, 2015, at 9:38 PM, matroyd mailto:debajyoti@healthagen.com>> wrote: Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it fails. This is on Spark 1.2.0. Following are details of the code, command and exceptions: http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possible<http://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP> Thanks in advance for any guidance View this message in context: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0<http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp> Sent from the Apache Spark User List mailing list archive<http://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD> at Nabble.com<http://Nabble.com>.
Re: Error when get data from hive table. Use python code.
You are running yarn-client mode. How about increase the --driver-memory and give it a try? Thanks. Zhan Zhang On Jan 29, 2015, at 6:36 PM, QiuxuanZhu mailto:ilsh1...@gmail.com>> wrote: Dear all, I have no idea when it raises an error when I run the following code. def getRow(data): return data.msg first_sql = "select * from logs.event where dt = '20150120' and et = 'ppc' LIMIT 10"#error #first_sql = "select * from hivecrawler.vip_crawler where src='xx' and dt='" + timestamp + "'"#correct sc = SparkContext(appName="parse") sqlContext = HiveContext(sc) data = sqlContext.sql(first_sql) file_target = "/tmp/test/logdd" data.map(getRow).saveAsTextFile(file_target) sc.stop() print 'stop' I submit the code by following script: /usr/local/spark-default/bin/spark-submit --master yarn-client --executor-memory 8G --num-executors 20 --executor-cores 2 --py-files a.py It would raise a error. The Spark Log shows that 15/01/30 09:46:39 ERROR metastore.RetryingHMSHandler: java.lang.OutOfMemoryError: GC overhead limit exceeded and the python code shows that: py4j.protocol.Py4JJavaError: An error occurred while calling o26.javaToPython. : java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.SingleByteCharsetConverter.toString(SingleByteCharsetConverter.java:333) at com.mysql.jdbc.ResultSetRow.getString(ResultSetRow.java:819) at com.mysql.jdbc.ByteArrayRow.getString(ByteArrayRow.java:70) at com.mysql.jdbc.ResultSetImpl.getStringInternal(ResultSetImpl.java:5811) at com.mysql.jdbc.ResultSetImpl.getString(ResultSetImpl.java:5688) at com.mysql.jdbc.ResultSetImpl.getObject(ResultSetImpl.java:4985) at org.datanucleus.store.rdbms.datasource.dbcp.DelegatingResultSet.getObject(DelegatingResultSet.java:325) at org.datanucleus.store.rdbms.datasource.dbcp.DelegatingResultSet.getObject(DelegatingResultSet.java:325) at org.datanucleus.store.rdbms.query.ResultClassROF.getResultObject(ResultClassROF.java:666) at org.datanucleus.store.rdbms.query.ResultClassROF.getObject(ResultClassROF.java:309) at org.datanucleus.store.rdbms.query.ForwardQueryResult.nextResultSetElement(ForwardQueryResult.java:181) at org.datanucleus.store.rdbms.query.ForwardQueryResult$QueryResultIterator.next(ForwardQueryResult.java:403) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.loopJoinOrderedResult(MetaStoreDirectSql.java:665) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getPartitionsViaSqlFilterInternal(MetaStoreDirectSql.java:429) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getPartitions(MetaStoreDirectSql.java:224) at org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:1563) at org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:1559) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2208) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsInternal(ObjectStore.java:1559) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitions(ObjectStore.java:1553) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:108) at com.sun.proxy.$Proxy25.getPartitions(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_partitions(HiveMetaStore.java:2516) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) It looks like a memory problem. But If I switch another hive table to get data, the code works fine. Any idea which direction should I start with?Config? Thanks. -- 跑不完马拉松的摄影师不是好背包客。 下个目标,该是6K的峰了吧?恩。
Re: Connect to Hive metastore (on YARN) from Spark Shell?
You can put hive-site.xml in your conf/ directory. It will connect to Hive when HiveContext is initialized. Thanks. Zhan Zhang On Jan 21, 2015, at 12:35 PM, YaoPau wrote: > Is this possible, and if so what steps do I need to take to make this happen? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OOM for HiveFromSpark example
Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: "@" (64), after : "". 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:53 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 17:59:59 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore 15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore 15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty 15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager 15/01/13 18:00:02 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis 15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Creating table src position=27 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src 15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_database: default 15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang ip=unknown-ip-addr cmd=get_database: default 15/01/13 18:00:03 INFO ql.Driver: Semantic Analysis Completed 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO ql.Driver: Starting command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO log.PerfLogger: 15/01/13 18:00:03 INFO exec.DDLTask: Default to LazySimpleSerDe for table src 15/01/13 18:00:05 INFO log.PerfLogger: Exception in thread "Driver" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Driver" -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the in
Re: Driver hangs on running mllib word2vec
I think it is overflow. The training data is quite big. The algorithms scalability highly depends on the vocabSize. Even without overflow, there are still other bottlenecks, for example, syn0Global and syn1Global, each of them has vocabSize * vectorSize elements. Thanks. Zhan Zhang On Jan 5, 2015, at 7:47 PM, Eric Zhen wrote: > Hi Xiangrui, > > Our dataset is about 80GB(10B lines). > > In the driver's log, we foud this: > > INFO Word2Vec: trainWordsCount = -1610413239 > > it seems that there is a integer overflow? > > > On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng wrote: > How big is your dataset, and what is the vocabulary size? -Xiangrui > > On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen wrote: > > Hi, > > > > When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup > > usage. Here is the jstack output: > > > > "main" prio=10 tid=0x40112800 nid=0x46f2 runnable > > [0x4162e000] > >java.lang.Thread.State: RUNNABLE > > at > > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) > > at > > java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) > > at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) > > at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) > > at > > java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) > > at > > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) > > at > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) > > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) > > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) > > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) > > at > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) > > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) > > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) > > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) > > at > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) > > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) > > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) > > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) > > at > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) > > at > > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) > > at > > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) > > at > > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) > > at > > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > > at > > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) > > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) > > at > > org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > > at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) > > at com.baidu.inf.WordCount$.main(WordCount.scala:31) > > at com.baidu.inf.WordCount.main(WordCount.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > > at > > org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) > > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > > > -- > > Best Regards > > > > -- > Best Regards -- CONFIDENTIALITY NOTICE NOTICE: This message
Re: Spark 1.2 + Avro file does not work in HDP2.2
Hi Manas, There is a small patch needed for HDP2.2. You can refer to this PR https://github.com/apache/spark/pull/3409 There are some other issues compiling against hadoop2.6. But we will fully support it very soon. You can ping me, if you want. Thanks. Zhan Zhang On Dec 12, 2014, at 11:38 AM, Manas Kar wrote: > Hi Experts, > I have recently installed HDP2.2(Depends on hadoop 2.6). > My spark 1.2 is built with hadoop 2.4 profile. > > My program has following dependencies > val avro= "org.apache.avro" % "avro-mapred" %"1.7.7" > val spark = "org.apache.spark" % "spark-core_2.10" % "1.2.0" % > "provided" > > My program to read avro files fails with the following error. What am I doing > wrong? > > > java.lang.IncompatibleClassChangeError: Found interface > org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected > at > org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) > at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Passing Java Options to Spark AM launching
Please check whether https://github.com/apache/spark/pull/3409#issuecomment-64045677 solve the problem for launching AM. Thanks. Zhan Zhang On Dec 1, 2014, at 4:49 PM, Mohammad Islam wrote: > Hi, > How to pass the Java options (such as "-XX:MaxMetaspaceSize=100M") when > lunching AM or task containers? > > This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, > setting the property such as > "mapreduce.map.java.opts" would do the work. > > Any help would be highly appreciated. > > Regards, > Mohammad > > > > > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Spark SQL Hive Version
The original spark-project hive-0.13.1 has some problem with packaging causing version conflicts, and hive-0.13.1a is repackaged to solve the problem. They share the same official hive source code release 0.13.1, with unnecessary package removed from the original official hive release package. You can refer to https://github.com/apache/spark/pull/2685 for the whole story. Thanks. Zhan Zhang Thanks. Zhan Zhang On Nov 5, 2014, at 4:47 PM, Cheng, Hao wrote: > Hi, all, I noticed that when compiling the SparkSQL with profile > “hive-0.13.1”, it will fetch the Hive version of 0.13.1a under groupId > “org.spark-project.hive”, what’s the difference with the one of > “org.apache.hive”? And where can I get the source code for re-compiling? > > Thanks, > Cheng Hao -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Use RDD like a Iterator
RDD.toLocalIterator return the partition one by one but with all elements in the partition, which is not lazy calculated. Given the design of spark, it is very hard to maintain the state of iterator across runJob. def toLocalIterator: Iterator[T] = { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head } (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) } Thanks. Zhan Zhang On Oct 29, 2014, at 3:43 AM, Yanbo Liang wrote: > RDD.toLocalIterator() is the suitable solution. > But I doubt whether it conform with the design principle of spark and RDD. > All RDD transform is lazily computed until it end with some actions. > > 2014-10-29 15:28 GMT+08:00 Sean Owen : > Call RDD.toLocalIterator()? > > https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html > > On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin wrote: > > Hi, ALL > > > > > > > > I have a RDD[T], can I use it like a iterator. > > > > That means I can compute every element of this RDD lazily. > > > > > > > > Best Regards, > > > > Kevin. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: how to retrieve the value of a column of type date/timestamp from a Spark SQL Row
Can you use row(i).asInstanceOf[] Thanks. Zhan Zhang On Oct 28, 2014, at 5:03 PM, Mohammed Guller wrote: > Hi – > > The Spark SQL Row class has methods such as getInt, getLong, getBoolean, > getFloat, getDouble, etc. However, I don’t see a getDate method. So how can > one retrieve a date/timestamp type column from a result set? > > Thanks, > Mohammed -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Use RDD like a Iterator
I think it is already lazily computed, or do you mean something else? Following is the signature of compute in RDD def compute(split: Partition, context: TaskContext): Iterator[T] Thanks. Zhan Zhang On Oct 28, 2014, at 8:15 PM, Dai, Kevin wrote: > Hi, ALL > > I have a RDD[T], can I use it like a iterator. > That means I can compute every element of this RDD lazily. > > Best Regards, > Kevin. -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: run multiple spark applications in parallel
You can set your executor number with --num-executors. Also changing yarn-client save you one container for driver. Then check your yarn resource manager to make sure there are more containers available to serve your extra apps. Thanks. Zhan Zhang On Oct 28, 2014, at 5:31 PM, Soumya Simanta wrote: > Maybe changing --master yarn-cluster to --master yarn-client help. > > > On Tue, Oct 28, 2014 at 7:25 PM, Josh J wrote: > Sorry, I should've included some stats with my email > > I execute each job in the following manner > > ./bin/spark-submit --class CLASSNAME --master yarn-cluster --driver-memory 1g > --executor-memory 1g --executor-cores 1 UBER.JAR ${ZK_PORT_2181_TCP_ADDR} > my-consumer-group1 1 > > > > The box has > > 24 CPUs, Intel(R) Xeon(R) CPU E5-2420 v2 @ 2.20GHz > > 32 GB RAM > > > > Thanks, > > Josh > > > On Tue, Oct 28, 2014 at 4:15 PM, Soumya Simanta > wrote: > Try reducing the resources (cores and memory) of each application. > > > > > On Oct 28, 2014, at 7:05 PM, Josh J wrote: > > > > Hi, > > > > How do I run multiple spark applications in parallel? I tried to run on > > yarn cluster, though the second application submitted does not run. > > > > Thanks, > > Josh > > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.