Re: Convert each partition of RDD to Dataframe
Looks no obvious relationship between the partition or tables, maybe try make them in different jobs, so they could run at same time to fully make use of the cluster resource. | | prosp4300 邮箱:prosp4...@163.com | Signature is customized by Netease Mail Master On 02/27/2020 22:50, Manjunath Shetty H wrote: Hi Enrico, In that case how to make effective use of all nodes in the cluster ?. And also whats your opinion on the below Create 10 Dataframes sequentially in Driver program and transform/write to hdfs one after the other Or the current approach mentioned in the previous mail What will be the performance implications ? Regards Manjunath From: Enrico Minack Sent: Thursday, February 27, 2020 7:57 PM To:user@spark.apache.org Subject: Re: Convert each partition of RDD to Dataframe Hi Manjunath, why not creating 10 DataFrames loading the different tables in the first place? Enrico Am 27.02.20 um 14:53 schrieb Manjunath Shetty H: Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet will be helpful. To explain the problem more, I have 10 partitions , each partition loads the data from different table and different SQL shard. Most of the partitions will have different schema. Before persisting the data i want to do some column level manipulation using data frame. So thats why i want to create 10 (based on partitions ) dataframes that maps to 10 different table/shard from a RDD. Regards Manjunath From: Charles vinodh Sent: Thursday, February 27, 2020 7:04 PM To: manjunathshe...@live.com Cc: user Subject: Re: Convert each partition of RDD to Dataframe Just split the single rdd into multiple individual rdds using a filter operation and then convert each individual rdds to it's respective dataframe.. On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H wrote: Hello All, In spark i am creating the custom partitions with Custom RDD, each partition will have different schema. Now in the transformation step we need to get the schema and run some Dataframe SQL queries per partition, because each partition data has different schema. How to get the Dataframe's per partition of a RDD?. As of now i am doing foreachPartition on RDD and converting Iterable to List and converting that to Dataframe. But the problem is converting Iterable to List will bring all the data to memory and it might crash the process. Is there any known way to do this ? or is there any way to handle Custom Partitions in Dataframes instead of using RDD ? I am using Spark version 1.6.2. Any pointers would be helpful. Thanks in advance
Re:Re: Custom Metric Sink on Executor Always ClassNotFound
Thanks a lot for the explanation Spark declare the Sink trait with package private, that's why the package looks weird, the metric system seems not intent to be extended package org.apache.spark.metrics.sink private[spark] trait Sink Make the custom sink class available on every executor system classpath is what an application developer want to avoid, because the sink only required for specific application, and it can be difficult to maintain. If it's possible to get MetricSystem at executor level and register the custom sink there, then the problem can be resolved in a better way, not sure how to achieve this. Thanks a lot At 2018-12-21 05:53:31, "Marcelo Vanzin" wrote: >First, it's really weird to use "org.apache.spark" for a class that is >not in Spark. > >For executors, the jar file of the sink needs to be in the system >classpath; the application jar is not in the system classpath, so that >does not work. There are different ways for you to get it there, most >of them manual (YARN is, I think, the only RM supported in Spark where >the application itself can do it). > >On Thu, Dec 20, 2018 at 1:48 PM prosp4300 wrote: >> >> Hi, Spark Users >> >> I'm play with spark metric monitoring, and want to add a custom sink which >> is HttpSink that send the metric through Restful API >> A subclass of Sink "org.apache.spark.metrics.sink.HttpSink" is created and >> packaged within application jar >> >> It works for driver instance, but once enabled for executor instance, >> following ClassNotFoundException will be throw out. This seems due to >> MetricSystem is started very early for executor before application jar is >> loaded. >> >> I wonder is there any way or best practice to add custom sink for executor >> instance? >> >> 18/12/21 04:58:32 ERROR MetricsSystem: Sink class >> org.apache.spark.metrics.sink.HttpSink cannot be instantiated >> 18/12/21 04:58:32 WARN UserGroupInformation: PriviledgedActionException >> as:yarn (auth:SIMPLE) cause:java.lang.ClassNotFoundException: >> org.apache.spark.metrics.sink.HttpSink >> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1933) >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284) >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.spark.metrics.sink.HttpSink >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at org.apache.spark.util.Utils$.classForName(Utils.scala:230) >> at >> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198) >> at >> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) >> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) >> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) >> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) >> at >> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) >> at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) >> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366) >> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201) >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223) >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67) >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) >> ... 4 more >> stdout0,*container_e81_1541584460930_3814_01_05� >> spark.log36118/12/21 04:58:00 ERROR >> org.apache.spark.metrics.MetricsSystem.logError:70 - Sink class >> org.apache.spark.metrics.sink.HttpSink cannot be instantiated >> >> >> >> > > > >-- >Marcelo
Custom Metric Sink on Executor Always ClassNotFound
Hi, Spark Users I'm play with spark metric monitoring, and want to add a custom sink which is HttpSink that send the metric through Restful API A subclass of Sink "org.apache.spark.metrics.sink.HttpSink" is created and packaged within application jar It works for driver instance, but once enabled for executor instance, following ClassNotFoundException will be throw out. This seems due to MetricSystem is started very early for executor before application jar is loaded. I wonder is there any way or best practice to add custom sink for executor instance? 18/12/21 04:58:32 ERROR MetricsSystem: Sink class org.apache.spark.metrics.sink.HttpSink cannot be instantiated 18/12/21 04:58:32 WARN UserGroupInformation: PriviledgedActionException as:yarn (auth:SIMPLE) cause:java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.HttpSink Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1933) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.HttpSink at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:230) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) ... 4 more stdout0,*container_e81_1541584460930_3814_01_05� spark.log36118/12/21 04:58:00 ERROR org.apache.spark.metrics.MetricsSystem.logError:70 - Sink class org.apache.spark.metrics.sink.HttpSink cannot be instantiated
Spray Client VS PlayWS vs Spring RestTemplate within Spark Job
Hi, Spark Users As I know, Spray Client depends on Akka ActorSystem, is this dependency theoretically means it is not possible to use spray-client in Spark Job which run from Spark Executor nodes I believe PlayWS should works as a Restful client to run from Spark Executor, how about traditional Spring RestTemplate, is there any suggestion or best practice to follow to acess Restful Service from Spark Jobs? Thanks a lot
Re:Do we still need to use Kryo serializer in Spark 1.6.2 ?
The way to use Kryo serializer is similar as Scala, like below, the only different is lack of convenient method "conf.registerKryoClasses", but it should be easy to make one by yourself conf=SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.classesToRegister", "com.example.YourClassA,com.example.YourClassB") At 2016-08-23 02:00:41, "Eric Ho" wrote: I heard that Kryo will get phased out at some point but not sure which Spark release. I'm using PySpark, does anyone has any docs on how to call / use Kryo Serializer in PySpark ? Thanks. -- -eric ho
Re:Log rollover in spark streaming jobs
Spark on Yarn by default support customized log4j configuration, RollingFileAppender could be used to avoid disk overflow as documented below If you need a reference to the proper location to put log files in the YARN so that YARN can properly display and aggregate them, use spark.yarn.app.container.log.dir in your log4j.properties. For example, log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log. For streaming applications, configuring RollingFileAppender and setting file location to YARN’s log directory will avoid disk overflow caused by large log files, and logs can be accessed using YARN’s log utility. You can get more information here: https://spark.apache.org/docs/latest/running-on-yarn.html#configuration At 2016-08-23 18:44:29, "Pradeep" wrote: >Hi All, > >I am running Java spark streaming jobs in yarn-client mode. Is there a way I >can manage logs rollover on edge node. I have a 10 second batch and log file >volume is huge. > >Thanks, >Pradeep > >- >To unsubscribe e-mail: user-unsubscr...@spark.apache.org >
Re:Re:Re: [ANNOUNCE] Announcing Apache Spark 2.0.0
The page mentioned before is the release notes that miss the links http://spark.apache.org/releases/spark-release-2-0-0.html#mllib At 2016-07-27 15:56:00, "prosp4300" wrote: Additionally, in the paragraph about MLlib, three links missed, it is better to provide the links to give us more information, thanks a lot See this blog post for details See this talk to learn more This talk lists many of these new features. 在 2016-07-27 15:18:41,"Ofir Manor" 写道: Hold the release! There is a minor documentation issue :) But seriously, congrats all on this massive achievement! Anyway, I think it would be very helpful to add a link to the Structured Streaming Developer Guide (Alpha) to both the documentation home page and from the beginning of the "old" Spark Streaming Programming Guide, as I think many users will look for them. I had a "deep link" to that page so I haven't noticed that it is very hard to find until now. I'm referring to this page: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html Ofir Manor Co-Founder & CTO | Equalum Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io On Wed, Jul 27, 2016 at 9:00 AM, Reynold Xin wrote: Hi all, Apache Spark 2.0.0 is the first release of Spark 2.x line. It includes 2500+ patches from 300+ contributors. To download Spark 2.0, head over to the download page: http://spark.apache.org/downloads.html To view the release notes: http://spark.apache.org/releases/spark-release-2-0-0.html (note: it can take a few hours for everything to be propagated, so you might get 404 on some download links. If you see any issues with the release notes or webpage *please contact me directly, off-list*)
Re:Re: [ANNOUNCE] Announcing Apache Spark 2.0.0
Additionally, in the paragraph about MLlib, three links missed, it is better to provide the links to give us more information, thanks a lot See this blog post for details See this talk to learn more This talk lists many of these new features. 在 2016-07-27 15:18:41,"Ofir Manor" 写道: Hold the release! There is a minor documentation issue :) But seriously, congrats all on this massive achievement! Anyway, I think it would be very helpful to add a link to the Structured Streaming Developer Guide (Alpha) to both the documentation home page and from the beginning of the "old" Spark Streaming Programming Guide, as I think many users will look for them. I had a "deep link" to that page so I haven't noticed that it is very hard to find until now. I'm referring to this page: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html Ofir Manor Co-Founder & CTO | Equalum Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io On Wed, Jul 27, 2016 at 9:00 AM, Reynold Xin wrote: Hi all, Apache Spark 2.0.0 is the first release of Spark 2.x line. It includes 2500+ patches from 300+ contributors. To download Spark 2.0, head over to the download page: http://spark.apache.org/downloads.html To view the release notes: http://spark.apache.org/releases/spark-release-2-0-0.html (note: it can take a few hours for everything to be propagated, so you might get 404 on some download links. If you see any issues with the release notes or webpage *please contact me directly, off-list*)
Re:Re: ORC v/s Parquet for Spark 2.0
Thanks for this immediate correction :) 在 2016-07-27 15:17:54,"Gourav Sengupta" 写道: Sorry, in my email above I was referring to KUDU, and there is goes how can KUDU be right if it is mentioned in forums first with a wrong spelling. Its got a difficult beginning where people were trying to figure out its name. Regards, Gourav Sengupta On Wed, Jul 27, 2016 at 8:15 AM, Gourav Sengupta wrote: Gosh, whether ORC came from this or that, it runs queries in HIVE with TEZ at a speed that is better than SPARK. Has anyone heard of KUDA? Its better than Parquet. But I think that someone might just start saying that KUDA has difficult lineage as well. After all dynastic rules dictate. Personally I feel that if something stores my data compressed and makes me access it faster I do not care where it comes from or how difficult the child birth was :) Regards, Gourav On Tue, Jul 26, 2016 at 11:19 PM, Sudhir Babu Pothineni wrote: Just correction: ORC Java libraries from Hive are forked into Apache ORC. Vectorization default. Do not know If Spark leveraging this new repo? org.apache.orc orc 1.1.2 pom Sent from my iPhone On Jul 26, 2016, at 4:50 PM, Koert Kuipers wrote: parquet was inspired by dremel but written from the ground up as a library with support for a variety of big data systems (hive, pig, impala, cascading, etc.). it is also easy to add new support, since its a proper library. orc bas been enhanced while deployed at facebook in hive and at yahoo in hive. just hive. it didn't really exist by itself. it was part of the big java soup that is called hive, without an easy way to extract it. hive does not expose proper java apis. it never cared for that. On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU wrote: Interesting opinion, thank you Still, on the website parquet is basically inspired by Dremel (Google) [1] and part of orc has been enhanced while deployed for Facebook, Yahoo [2]. Other than this presentation [3], do you guys know any other benchmark? [1]https://parquet.apache.org/documentation/latest/ [2]https://orc.apache.org/docs/ [3] http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet On 26 Jul 2016, at 15:19, Koert Kuipers wrote: when parquet came out it was developed by a community of companies, and was designed as a library to be supported by multiple big data projects. nice orc on the other hand initially only supported hive. it wasn't even designed as a library that can be re-used. even today it brings in the kitchen sink of transitive dependencies. yikes On Jul 26, 2016 5:09 AM, "Jörn Franke" wrote: I think both are very similar, but with slightly different goals. While they work transparently for each Hadoop application you need to enable specific support in the application for predicate push down. In the end you have to check which application you are using and do some tests (with correct predicate push down configuration). Keep in mind that both formats work best if they are sorted on filter columns (which is your responsibility) and if their optimatizations are correctly configured (min max index, bloom filter, compression etc) . If you need to ingest sensor data you may want to store it first in hbase and then batch process it in large files in Orc or parquet format. On 26 Jul 2016, at 04:09, janardhan shetty wrote: Just wondering advantages and disadvantages to convert data into ORC or Parquet. In the documentation of Spark there are numerous examples of Parquet format. Any strong reasons to chose Parquet over ORC file format ? Also : current data compression is bzip2 http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy This seems like biased.
Re:[ANNOUNCE] Announcing Apache Spark 2.0.0
Congratulations! 在 2016-07-27 14:00:22,"Reynold Xin" 写道: Hi all, Apache Spark 2.0.0 is the first release of Spark 2.x line. It includes 2500+ patches from 300+ contributors. To download Spark 2.0, head over to the download page: http://spark.apache.org/downloads.html To view the release notes: http://spark.apache.org/releases/spark-release-2-0-0.html (note: it can take a few hours for everything to be propagated, so you might get 404 on some download links. If you see any issues with the release notes or webpage *please contact me directly, off-list*)
Re:Re: RE: Error not found value sqlContext
So it is actually a compile time error in Eclipse, instead of jar generation from Eclipse, you can try to use sbt to assembly your jar, looks like your Eclipse does not recognize the Scala syntax properly. At 2015-11-20 21:36:55, "satish chandra j" wrote: HI All, I am getting this error while generating executable Jar file itself in Eclipse, if the Spark Application code has "import sqlContext.implicits._" line in it. Spark Applicaiton code works fine if the above mentioned line does not exist as I have tested by fetching data from an RDBMS by implementing JDBCRDD I tried couple of DataFrame related methods for which most of them errors stating that method has been overloaded Please let me know if any further inputs needed to analyze it Regards, Satish Chandra On Fri, Nov 20, 2015 at 5:46 PM, prosp4300 wrote: Looks like a classpath problem, if you can provide the command you used to run your application and environment variable SPARK_HOME, it will help others to identify the root problem 在2015年11月20日 18:59,Satish 写道: Hi Michael, As my current Spark version is 1.4.0 than why it error out as "error: not found: value sqlContext" when I have "import sqlContext.implicits._" in my Spark Job Regards Satish Chandra From: Michael Armbrust Sent: 20-11-2015 01:36 To: satish chandra j Cc: user; hari krishna Subject: Re: Error not found value sqlContext http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 On Thu, Nov 19, 2015 at 4:19 AM, satish chandra j wrote: HI All, we have recently migrated from Spark 1.2.1 to Spark 1.4.0, I am fetching data from an RDBMS using JDBCRDD and register it as temp table to perform SQL query Below approach is working fine in Spark 1.2.1: JDBCRDD --> apply map using Case Class --> apply createSchemaRDD --> registerTempTable --> perform SQL Query but now as createSchemaRDD is not supported in Spark 1.4.0 JDBCRDD --> apply map using Case Class with .toDF() --> registerTempTable --> perform SQL query on temptable JDBCRDD --> apply map using Case Class --> RDD.toDF().registerTempTable --> perform SQL query on temptable Only solution I get everywhere is to use "import sqlContext.implicits._" after val SQLContext = new org.apache.spark.sql.SQLContext(sc) But it errors with the two generic errors 1. error: not found: value sqlContext 2. value toDF is not a member of org.apache.spark.rdd.RDD
回复:RE: Error not found value sqlContext
Looks like a classpath problem, if you can provide the command you used to run your application and environment variable SPARK_HOME, it will help others to identify the root problem 在2015年11月20日 18:59,Satish 写道: Hi Michael, As my current Spark version is 1.4.0 than why it error out as "error: not found: value sqlContext" when I have "import sqlContext.implicits._" in my Spark Job Regards Satish Chandra From: Michael Armbrust Sent: 20-11-2015 01:36 To: satish chandra j Cc: user; hari krishna Subject: Re: Error not found value sqlContext http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 On Thu, Nov 19, 2015 at 4:19 AM, satish chandra j wrote: HI All, we have recently migrated from Spark 1.2.1 to Spark 1.4.0, I am fetching data from an RDBMS using JDBCRDD and register it as temp table to perform SQL query Below approach is working fine in Spark 1.2.1: JDBCRDD --> apply map using Case Class --> apply createSchemaRDD --> registerTempTable --> perform SQL Query but now as createSchemaRDD is not supported in Spark 1.4.0 JDBCRDD --> apply map using Case Class with .toDF() --> registerTempTable --> perform SQL query on temptable JDBCRDD --> apply map using Case Class --> RDD.toDF().registerTempTable --> perform SQL query on temptable Only solution I get everywhere is to use "import sqlContext.implicits._" after val SQLContext = new org.apache.spark.sql.SQLContext(sc) But it errors with the two generic errors 1. error: not found: value sqlContext 2. value toDF is not a member of org.apache.spark.rdd.RDD
RE:RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL
By the way turn off the code generation could be an option to try, sometime code generation could introduce slowness 在2015年09月11日 15:58,Cheng, Hao 写道: Can you confirm if the query really run in the cluster mode? Not the local mode. Can you print the call stack of the executor when the query is running? BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, not Spark SQL. From: Todd [mailto:bit1...@163.com] Sent: Friday, September 11, 2015 3:39 PM To: Todd Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL I add the following two options: spark.sql.planner.sortMergeJoin=false spark.shuffle.reduceLocality.enabled=false But it still performs the same as not setting them two. One thing is that on the spark ui, when I click the SQL tab, it shows an empty page but the header title 'SQL',there is no table to show queries and execution plan information. At 2015-09-11 14:39:06, "Todd" wrote: Thanks Hao. Yes,it is still low as SMJ。Let me try the option your suggested, At 2015-09-11 14:34:46, "Cheng, Hao" wrote: You mean the performance is still slow as the SMJ in Spark 1.5? Can you set the spark.shuffle.reduceLocality.enabled=false when you start the spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by default, but we found it probably causes the performance reduce dramatically. From: Todd [mailto:bit1...@163.com] Sent: Friday, September 11, 2015 2:17 PM To: Cheng, Hao Cc: Jesse F Chen; Michael Armbrust; user@spark.apache.org Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL Thanks Hao for the reply. I turn the merge sort join off, the physical plan is below, but the performance is roughly the same as it on... == Physical Plan == TungstenProject [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0] ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight TungstenExchange hashpartitioning(ss_item_sk#2) ConvertToUnsafe Scan ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0] TungstenExchange hashpartitioning(ss_item_sk#25) ConvertToUnsafe Scan ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25] Code Generation: true At 2015-09-11 13:48:23, "Cheng, Hao" wrote: This is not a big surprise the SMJ is slower than the HashJoin, as we do not fully utilize the sorting yet, more details can be found at https://issues.apache.org/jira/browse/SPARK-2926 . Anyway, can you disable the sort merge join by “spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query again? In our previous testing, it’s about 20% slower for sort merge join. I am not sure if there anything else slow down the performance. Hao From: Jesse F Chen [mailto:jfc...@us.ibm.com] Sent: Friday, September 11, 2015 1:18 PM To: Michael Armbrust Cc: Todd; user@spark.apache.org Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL Could this be a build issue (i.e., sbt package)? If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression too in queries (all other things identical)... I am curious, to build 1.5 (when it isn't released yet), what do I need to do with the build.sbt file? any special parameters i should be using to make sure I load the latest hive dependencies? Michael Armbrust ---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so this is surprising. I From: Michael Armbrust To: Todd Cc: "user@spark.apache.org" Date: 09/10/2015 11:07 AM Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so this is surprising. In my experiments Spark 1.5 is either the same or faster than 1.4 with only small exceptions. A few thoughts, - 600 partitions is probably way too many for 6G of data. - Providing the output of explain for both runs would be helpful whenever reporting performance changes. On Thu, Sep 10, 2015 at 1:24 AM, Todd wrote: Hi, I am using data generated with sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark sql performance (spark on yarn, with 10 nodes) with the following code (The table store_sales is about 90 million records, 6G in size) val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales" val name="store_sales" sqlContext.sql( s""" |CREATE TEMPORARY TABLE ${name} |USING org.apache.spark.sql.parquet |OPTIONS ( | path '${outputDir}
回复:Does spark sql support column indexing
The answer is simply NO, But I hope someone could give more deep insight or any meaningful reference 在2015年08月19日 15:21,Todd 写道: I don't find related talk on whether spark sql supports column indexing. If it does, is there guide how to do it? Thanks.
回复:Spark DataFrames uses too many partition
Hi, I want to know how you coalesce the partition to one to improve the performance Thanks 在2015年08月11日 23:31,Al M 写道: I am using DataFrames with Spark 1.4.1. I really like DataFrames but the partitioning makes no sense to me. I am loading lots of very small files and joining them together. Every file is loaded by Spark with just one partition. Each time I join two small files the partition count increases to 200. This makes my application take 10x as long as if I coalesce everything to 1 partition after each join. With normal RDDs it would not expand out the partitions to 200 after joining two files with one partition each. It would either keep it at one or expand it to two. Why do DataFrames expand out the partitions so much? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:SparkSQL 1.4 can't accept registration of UDF?
What's the result of "list jar" in both 1.3.1 and 1.4.0, please check if there is any difference At 2015-07-15 08:10:44, "ogoh" wrote: >Hello, >I am using SparkSQL along with ThriftServer so that we can access using Hive >queries. >With Spark 1.3.1, I can register UDF function. But, Spark 1.4.0 doesn't work >for that. The jar of the udf is same. >Below is logs: >I appreciate any advice. > > >== With Spark 1.4 >Beeline version 1.4.0 by Apache Hive > >0: jdbc:hive2://localhost:1> add jar >hdfs:///user/hive/lib/dw-udf-2015.06.06-SNAPSHOT.jar; > >0: jdbc:hive2://localhost:1> create temporary function parse_trace as >'com. mycom.dataengine.udf.GenericUDFParseTraceAnnotation'; > >15/07/14 23:49:43 DEBUG transport.TSaslTransport: writing data length: 206 > >15/07/14 23:49:43 DEBUG transport.TSaslTransport: CLIENT: reading data >length: 201 > >Error: org.apache.spark.sql.execution.QueryExecutionException: FAILED: >Execution Error, return code 1 from >org.apache.hadoop.hive.ql.exec.FunctionTask (state=,code=0) > > >== With Spark 1.3.1: > >Beeline version 1.3.1 by Apache Hive > >0: jdbc:hive2://localhost:10001> add jar >hdfs:///user/hive/lib/dw-udf-2015.06.06-SNAPSHOT.jar; > >+-+ > >| Result | > >+-+ > >+-+ > >No rows selected (1.313 seconds) > >0: jdbc:hive2://localhost:10001> create temporary function parse_trace as >'com. mycom.dataengine.udf.GenericUDFParseTraceAnnotation'; > >+-+ > >| result | > >+-+ > >+-+ > >No rows selected (0.999 seconds) > > >=== The logs of ThriftServer of Spark 1.4.0 > >15/07/14 23:49:43 INFO SparkExecuteStatementOperation: Running query 'create >temporary function parse_trace as >'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation'' > >15/07/14 23:49:43 INFO ParseDriver: Parsing command: create temporary >function parse_trace as >'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation' > >15/07/14 23:49:43 INFO ParseDriver: Parse Completed > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO Driver: Concurrency mode is disabled, not creating a >lock manager > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO ParseDriver: Parsing command: create temporary >function parse_trace as >'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation' > >15/07/14 23:49:43 INFO ParseDriver: Parse Completed > >15/07/14 23:49:43 INFO PerfLogger: start=1436917783106 end=1436917783106 duration=0 >from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO HiveMetaStore: 2: get_database: default > >15/07/14 23:49:43 INFO audit: ugi=anonymous ip=unknown-ip-addr >cmd=get_database: default > >15/07/14 23:49:43 INFO HiveMetaStore: 2: Opening raw store with >implemenation class:org.apache.hadoop.hive.metastore.ObjectStore > >15/07/14 23:49:43 INFO ObjectStore: ObjectStore, initialize called > >15/07/14 23:49:43 INFO MetaStoreDirectSql: MySQL check failed, assuming we >are not on mysql: Lexical error at line 1, column 5. Encountered: "@" (64), >after : "". > >15/07/14 23:49:43 INFO Query: Reading in results for query >"org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is >closing > >15/07/14 23:49:43 INFO ObjectStore: Initialized ObjectStore > >15/07/14 23:49:43 INFO FunctionSemanticAnalyzer: analyze done > >15/07/14 23:49:43 INFO Driver: Semantic Analysis Completed > >15/07/14 23:49:43 INFO PerfLogger: start=1436917783106 end=1436917783114 duration=8 >from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO Driver: Returning Hive schema: >Schema(fieldSchemas:null, properties:null) > >15/07/14 23:49:43 INFO PerfLogger: start=1436917783106 end=1436917783114 duration=8 >from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO Driver: Starting command: create temporary function >parse_trace as 'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation' > >15/07/14 23:49:43 INFO PerfLogger: start=1436917783105 end=1436917783115 duration=10 >from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 INFO PerfLogger: from=org.apache.hadoop.hive.ql.Driver> > >15/07/14 23:49:43 ERROR Task: FAILED: Class >com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation not found > >15/07/14 23:49:43 INFO FunctionTask: create function: >java.lang.ClassNotFoundException: >com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation > >at java.net.URLClassLoader$1.run(URLClassLoader.java:372) > >at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > > > >-- >View this message in context: >http://apache-spark-us
Re:Spark query
As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs, please take a look below builtin UDFs of Hive, get day of year should be as simply as existing RDBMS https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions At 2015-07-09 12:02:44, "Ravisankar Mani" wrote: Hi everyone, I can't get 'day of year' when using spark query. Can you help any way to achieve day of year? Regards, Ravi
回复:Re: how to use DoubleRDDFunctions on mllib Vector?
Seems what Feynman mentioned is the source code instead of documentation, vectorMean is private, see https://github.com/apache/spark/blob/v1.3.0/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala At 2015-07-09 10:10:58, "诺铁" wrote: thanks, I understand now. but I can't find mllib.clustering.GaussianMixture#vectorMean , what version of spark do you use? On Thu, Jul 9, 2015 at 1:16 AM, Feynman Liang wrote: A RDD[Double] is an abstraction for a large collection of doubles, possibly distributed across multiple nodes. The DoubleRDDFunctions are there for performing mean and variance calculations across this distributed dataset. In contrast, a Vector is not distributed and fits on your local machine. You would be better off computing these quantities on the Vector directly (see mllib.clustering.GaussianMixture#vectorMean for an example of how to compute the mean of a vector). On Tue, Jul 7, 2015 at 8:26 PM, 诺铁 wrote: hi, there are some useful functions in DoubleRDDFunctions, which I can use if I have RDD[Double], eg, mean, variance. Vector doesn't have such methods, how can I convert Vector to RDD[Double], or maybe better if I can call mean directly on a Vector?
Re:Maintain Persistent Connection with Hive meta store
Each time you run the jar, a new JVM will be started, maintain connection between different JVM is not a correct way to think of > each time when I run that jar it tries to make connection with hive metastore At 2015-07-07 17:07:06, "wazza" wrote: >Hi I am new to Apache Spark and I have tried to query hive tables using >Apache Spark Sql. First I have tried it in Spark-shell where I can query 1 >lakh records from hive table within a second. Then I have tried in a java >code which always take more than 10 seconds and I have noted that each time >when I run that jar it tries to make connection with hive metastore. can any >one tell me how to maintain the connection between Apache spark and Hive >metastore or else how to achieve that same in java. > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/Maintain-Persistent-Connection-with-Hive-meta-store-tp23664.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org >
回复:HiveContext throws org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Hi, bdev Derby is the default embedded DB for Hive MetaStore if you do not specify a hive.metastore.uris, please take a look at the lib directory of hive, you can find out derby jar there, Spark does not require derby by default At 2015-07-07 17:07:28, "bdev" wrote: >Just trying to get started with Spark and attempting to use HiveContext using >spark-shell to interact with existing Hive tables on my CDH cluster but keep >running into the errors (pls see below) when I do 'hiveContext.sql("show >tables")'. Wanted to know what all JARs need to be included to have this >working. Thanks! > > >java.lang.RuntimeException: java.lang.RuntimeException: Unable to >instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient > at >org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:472) > at >org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229) > at >org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225) > at >org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:241) > at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:240) > at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:86) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:42) > at $iwC$$iwC$$iwC$$iwC$$iwC.(:44) > at $iwC$$iwC$$iwC$$iwC.(:46) > at $iwC$$iwC$$iwC.(:48) > at $iwC$$iwC.(:50) > at $iwC.(:52) > at (:54) > at .(:58) > at .() > at .(:7) > at .() > at $print() > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at >sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at >sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at >org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) > at >org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) > at >org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) > at >org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) > at >org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) > at >org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) > at >org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) > at >scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at >org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at >sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at >sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at >org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >Caused by: java.lang.RuntimeException: Unable to instantiate >org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient > at >org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1488) > at >org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:64) > at >org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
Re: Performance tuning in Spark SQL.
Please see below link for the ways available https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#performance-tuning For example, reduce spark.sql.shuffle.partitions from 200 to 10 could improve the performance significantly -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871p23576.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame registerTempTable Concurrent Access
Hi, Spark Users I'm trying to update registered DataFrame temp table by invoke DataFrame.registerTempTable again and again. Assume I have DataFrame temp table "table1", below is the concurrent logic sqlContext.table("table1").filter(***).unionAll(dummy1DF).registerTempTable("table1") sqlContext.table("table1").filter(***).unionAll(dummy2DF).registerTempTable("table1") And I expect after above logic completed, the final temp table "table1" contain all the changes. Is this a correct expectation even without proper locking system to make the executing ordered? Such updating logic seems not functional programming, correct? Thanks a lot prosp4300