Re: Spark query performance of cached data affected by RDD lineage
Thanks to all for the quick replies, they helped a lot. To answer a few of the follow-up questions ... > 1. How did you fix this performance which I gather programmatically The main problem in my original code was that the logic was not being executed when it should have been. This 'if' clause has the effect of creating a new data frame directly from the contents of the Cassandra data source, throwing away the old RDD lineage of the existing data frame. By incorrectly executing the 'else' clause instead, the new data frame was being created from the existing data frame again and again, extending the entire lineage with each iteration and degrading query times. Fixing the code stopped the lineage from constantly growing with each refresh. > 2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled", > "true") Yes, this is already set. > 3. Depending on the size of your data both source and new data, do you > have any indication that your data in global temporary view is totally > cached. This should show in the storage tab in UI. If you have data on the > disk for then this will affect the performance Yes, the cached temporary view is in the storage tab with the expected amount of memory usage, although it is difficult to check the specific contents of this memory cache. > You could try checkpointing occasionally and see if that helps This was very helpful, as I was not familiar with the Spark checkpointing feature. In our application, we don't need fault-tolerant dataframes/RDDs, so I inserted localCheckpoint() calls in the code prior to the uncaching and re-caching steps. This seems to have made all queries run faster, with some improved immediately by a factor of 3! Based on these tests, it is clear that RDD lineage can have a major effect on the performance of Spark applications, whether or not the data has been cached in memory. Thanks again for the good advice! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark query performance of cached data affected by RDD lineage
> Do Spark SQL queries depend directly on the RDD lineage even when the final results have been cached? Yes, if one of the nodes holding cached data later fails spark would need to rebuild that state somehow. You could try checkpointing occasionally and see if that helps On Sat, 22 May 2021, 11:44 pm Fred Yeadon, wrote: > Hi all, > > > Working on a complex Spark 3.0.1 application, I noticed some unexpected > Spark behavior recently that I am hoping someone can explain. The > application is Java with many large classes, but I have tried to describe > the essential logic below. > > During periodic refresh runs, the application extracts data from Cassandra > database tables, filters and combines them into Spark data frames that are > cached and registered as views in the global temporary DB. The data frames > that were previously there are uncached and replaced by the new ones. > Remote clients can then query the views through Spark SQL. > > This refresh process runs in multiple threads that build each new Spark > data frame progressively, reading one Cassandra table, filtering out the > old contents of the view and adding the new Cassandra contents with a union > operation. Each thread un-caches the old data frame and caches the new > one, then runs a count() action to realize the previous transformations. > Below is some pseudo-code for this multi-threaded logic. > > > > * > > // Read Cassandra table using Spark Cassandra Connector > > Dataset data = > sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load(); > > > // Combine data into single Spark view > > Dataset combinedView = null; > > String combinedViewName = "myView"; > > if () { > >// Start a new combined view from the contents of the source table > >combinedView = data; > > } else { > >// Read the existing combined view to further extend it > >combinedView = sparkSession.table(combinedViewName); > >… > >// Remove stale data with filter > >combinedView = combinedView.filter(); > >… > >// Add new data > >combinedView = combinedView.union(data); > > } > > > // Re-cache modified combined view > > sparkSession.catalog().uncacheTable(combinedViewName); > > combinedView.createOrReplaceGlobalTempView(combinedViewName); > > sparkSession.catalog().cacheTable(combinedViewName); > > combinedView.count(); > > > > * > > The application works, but I recently fixed a bug where Spark SQL queries > were running incrementally slower after each refresh, resulting in steady > performance degradation. After investigation, I found that the through the data frame> check logic above was not correct, causing a new > combined data frame to build on the lineage of the old data frame RDD that > it is replacing. The Spark physical plan of many queries was becoming > larger and larger because of 'filter' and 'union' transformations being > added to the same data frame. I am not yet very familiar with Spark query > plans, but below are fragments of a physical plan before and after a > refresh that highlight the differences. > > Before refresh > > === > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=true > > +- *(8) Project… > >… > > +-*(1) Project … > > +- *(1) Filter … > > +- Scan In-memory table …. > > +- InMemoryRelation … > > +- Exchange RoundRobinPartitioning(2), … > >+- Union > >:- Exchange RoundRobinPartitioning(2), … > >: +- Union > >: :- *(1) Project … > >: : +- *(1) Filter … > >: +- BatchScan … Cassandra > Scan: > >... > >: :- *(2) Project … > >: : +- *(2) Filter … > >: +- BatchScan … Cassandra > Scan: > >... > >: :- *(3) Project … > >: : +- *(3) Filter … > >: +- BatchScan … Cassandra > Scan: > >
Re: Spark query performance of cached data affected by RDD lineage
Hi Fred, You said you managed to fix the problem somehow and have attributed some issues with RDD lineage. Few things come to my mind: 1. How did you fix this performance which I gather programmatically 2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled", "true") 3. Depending on the size of your data both source and new data, do you have any indication that your data in global temporary view is totally cached. This should show in the storage tab in UI. If you have data on the disk for then this will affect the performance 4. What is the output of print(rdd.toDebugString()) [image: image.png] I doubt this issue is caused by RDD lineage by adding additional steps not required. HTH Mich view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 22 May 2021 at 23:44, Fred Yeadon wrote: > Hi all, > > > Working on a complex Spark 3.0.1 application, I noticed some unexpected > Spark behavior recently that I am hoping someone can explain. The > application is Java with many large classes, but I have tried to describe > the essential logic below. > > During periodic refresh runs, the application extracts data from Cassandra > database tables, filters and combines them into Spark data frames that are > cached and registered as views in the global temporary DB. The data frames > that were previously there are uncached and replaced by the new ones. > Remote clients can then query the views through Spark SQL. > > This refresh process runs in multiple threads that build each new Spark > data frame progressively, reading one Cassandra table, filtering out the > old contents of the view and adding the new Cassandra contents with a union > operation. Each thread un-caches the old data frame and caches the new > one, then runs a count() action to realize the previous transformations. > Below is some pseudo-code for this multi-threaded logic. > > > > * > > // Read Cassandra table using Spark Cassandra Connector > > Dataset data = > sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load(); > > > // Combine data into single Spark view > > Dataset combinedView = null; > > String combinedViewName = "myView"; > > if () { > >// Start a new combined view from the contents of the source table > >combinedView = data; > > } else { > >// Read the existing combined view to further extend it > >combinedView = sparkSession.table(combinedViewName); > >… > >// Remove stale data with filter > >combinedView = combinedView.filter(); > >… > >// Add new data > >combinedView = combinedView.union(data); > > } > > > // Re-cache modified combined view > > sparkSession.catalog().uncacheTable(combinedViewName); > > combinedView.createOrReplaceGlobalTempView(combinedViewName); > > sparkSession.catalog().cacheTable(combinedViewName); > > combinedView.count(); > > > > * > > The application works, but I recently fixed a bug where Spark SQL queries > were running incrementally slower after each refresh, resulting in steady > performance degradation. After investigation, I found that the through the data frame> check logic above was not correct, causing a new > combined data frame to build on the lineage of the old data frame RDD that > it is replacing. The Spark physical plan of many queries was becoming > larger and larger because of 'filter' and 'union' transformations being > added to the same data frame. I am not yet very familiar with Spark query > plans, but below are fragments of a physical plan before and after a > refresh that highlight the differences. > > Before refresh > > === > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=true > > +- *(8) Project… > >… > > +-*(1) Project … > > +- *(1) Filter … > > +- Scan In-memory table …. > > +- InMemoryRelation … > > +- Exchange RoundRobinPartitioning(2), … > >+- Union > >:- Ex
Spark query performance of cached data affected by RDD lineage
Hi all, Working on a complex Spark 3.0.1 application, I noticed some unexpected Spark behavior recently that I am hoping someone can explain. The application is Java with many large classes, but I have tried to describe the essential logic below. During periodic refresh runs, the application extracts data from Cassandra database tables, filters and combines them into Spark data frames that are cached and registered as views in the global temporary DB. The data frames that were previously there are uncached and replaced by the new ones. Remote clients can then query the views through Spark SQL. This refresh process runs in multiple threads that build each new Spark data frame progressively, reading one Cassandra table, filtering out the old contents of the view and adding the new Cassandra contents with a union operation. Each thread un-caches the old data frame and caches the new one, then runs a count() action to realize the previous transformations. Below is some pseudo-code for this multi-threaded logic. * // Read Cassandra table using Spark Cassandra Connector Dataset data = sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load(); // Combine data into single Spark view Dataset combinedView = null; String combinedViewName = "myView"; if () { // Start a new combined view from the contents of the source table combinedView = data; } else { // Read the existing combined view to further extend it combinedView = sparkSession.table(combinedViewName); … // Remove stale data with filter combinedView = combinedView.filter(); … // Add new data combinedView = combinedView.union(data); } // Re-cache modified combined view sparkSession.catalog().uncacheTable(combinedViewName); combinedView.createOrReplaceGlobalTempView(combinedViewName); sparkSession.catalog().cacheTable(combinedViewName); combinedView.count(); * The application works, but I recently fixed a bug where Spark SQL queries were running incrementally slower after each refresh, resulting in steady performance degradation. After investigation, I found that the check logic above was not correct, causing a new combined data frame to build on the lineage of the old data frame RDD that it is replacing. The Spark physical plan of many queries was becoming larger and larger because of 'filter' and 'union' transformations being added to the same data frame. I am not yet very familiar with Spark query plans, but below are fragments of a physical plan before and after a refresh that highlight the differences. Before refresh === == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- *(8) Project… … +-*(1) Project … +- *(1) Filter … +- Scan In-memory table …. +- InMemoryRelation … +- Exchange RoundRobinPartitioning(2), … +- Union :- Exchange RoundRobinPartitioning(2), … : +- Union : :- *(1) Project … : : +- *(1) Filter … : +- BatchScan … Cassandra Scan: ... : :- *(2) Project … : : +- *(2) Filter … : +- BatchScan … Cassandra Scan: ... : :- *(3) Project … : : +- *(3) Filter … : +- BatchScan … Cassandra Scan: ... : :- *(8) Project … : : +- *(4) Filter … : +- BatchScan … Cassandra Scan: ... After refresh == == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- *(8) Project… … +-*(1) Project … +- *(1) Filter … +- Scan In-memory table …. +- InMemoryRelation … +- Exchange RoundRobinPartitioning(2), … +- Union :- Exchange RoundRobinPartitioning(2), … : +- Union NEW LINE ---> :- Exchange RoundRobinPartitioning(2), … NEW LINE ---> : +- Union : :- *(1) Project … : : +- *(1) Filter … :
Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop
scala> def getRootRdd( rdd:RDD[_] ): RDD[_] = { if (rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)} getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_] scala> val rdd = spark.read.parquet("/Users/russellspitzer/Temp/local").rdd rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at :24 scala> val scan = getRootRdd(rdd) scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at :24 scala> scan.partitions.map(scan.preferredLocations) res8: Array[Seq[String]] = Array(WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray()) I define a quick traversal to get the source RDD for the dataframe operation. I make the read datafrarne and get the RDD out of it. I traverse the RDD's dependencies to get the FileScan. I then apply the scan's preferredLocations methods to each partition. You can see the result here is that none of my partitions have a preferred location so they will all be run at "Any". This is because I'm using my local file system which never reports a preferred location so even though the scheduler will report "ANY" in this case they are actually node local. > On Apr 13, 2021, at 8:37 AM, Mohamadreza Rostami > wrote: > > Thanks for your response. > I think my HDFS-spark cluster is co-localized because I have a spark worker > per each datanode; in other words, I installed the spark workers on > datanodes, and I think that's the point that why this simple query on a > co-localized HDFS-spark cluster run in "Any" locality level? > Is there any way to figure out which IP or hostname of data-nodes returns > from name-node to the spark? or Can you offer me a debug approach? > >> On Farvardin 24, 1400 AP, at 17:45, Russell Spitzer >> mailto:russell.spit...@gmail.com>> wrote: >> >> Data locality can only occur if the Spark Executor IP address string matches >> the preferred location returned by the file system. So this job would only >> have local tasks if the datanode replicas for the files in question had the >> same ip address as the Spark executors you are using. If they don't then the >> scheduler falls back to assigning read tasks to the first executor available >> with locality level "any". >> >> So unless you have that HDFS - Spark Cluster co-localization I wouldn't >> expect this job to run at any other locality level than ANY. >> >>> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami >>> mailto:mohamadrezarosta...@gmail.com>> >>> wrote: >>> >>> I have a Hadoop cluster that uses Apache Spark to query parquet files saved >>> on Hadoop. For example, when i'm using the following PySpark code to find a >>> word in parquet files: >>> df = spark.read.parquet("hdfs://test/parquets/* ") >>> df.filter(df['word'] == "jhon").show() >>> After running this code, I go to spark application UI, stages tab, I see >>> that locality level summery set on Any. In contrast, because of this >>> query's nature, it must run locally and on NODE_LOCAL locality level at >>> least. When I check the network IO of the cluster while running this, I >>> find out that this query use network (network IO increases while the query >>> is running). The strange part of this situation is that the number shown in >>> the spark UI's shuffle section is very small. >>> How can I find out the root cause of this problem and solve that? >>> link of stackoverflow.com <http://stackoverflow.com/> : >>> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache >>> >>> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache> >
Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop
Data locality can only occur if the Spark Executor IP address string matches the preferred location returned by the file system. So this job would only have local tasks if the datanode replicas for the files in question had the same ip address as the Spark executors you are using. If they don't then the scheduler falls back to assigning read tasks to the first executor available with locality level "any". So unless you have that HDFS - Spark Cluster co-localization I wouldn't expect this job to run at any other locality level than ANY. > On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami > wrote: > > I have a Hadoop cluster that uses Apache Spark to query parquet files saved > on Hadoop. For example, when i'm using the following PySpark code to find a > word in parquet files: > df = spark.read.parquet("hdfs://test/parquets/* ") > df.filter(df['word'] == "jhon").show() > After running this code, I go to spark application UI, stages tab, I see that > locality level summery set on Any. In contrast, because of this query's > nature, it must run locally and on NODE_LOCAL locality level at least. When I > check the network IO of the cluster while running this, I find out that this > query use network (network IO increases while the query is running). The > strange part of this situation is that the number shown in the spark UI's > shuffle section is very small. > How can I find out the root cause of this problem and solve that? > link of stackoverflow.com <http://stackoverflow.com/> : > https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache > > <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>
[Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop
I have a Hadoop cluster that uses Apache Spark to query parquet files saved on Hadoop. For example, when i'm using the following PySpark code to find a word in parquet files: df = spark.read.parquet("hdfs://test/parquets/*") df.filter(df['word'] == "jhon").show() After running this code, I go to spark application UI, stages tab, I see that locality level summery set on Any. In contrast, because of this query's nature, it must run locally and on NODE_LOCAL locality level at least. When I check the network IO of the cluster while running this, I find out that this query use network (network IO increases while the query is running). The strange part of this situation is that the number shown in the spark UI's shuffle section is very small. How can I find out the root cause of this problem and solve that? link of stackoverflow.com : https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>
Re: RDD Partitions on HDFS file in Hive on Spark Query
Hi Ayan, , thanks for the explanation, I am aware of compression codecs. How does locality level set? Is it done by Spark or yarn? Please let me know, Thanks, Yesh On Nov 22, 2016 5:13 PM, "ayan guha"wrote: Hi RACK_LOCAL = Task running on the same rack but not on the same node where data is NODE_LOCAL = task and data is co-located. Probably you were looking for this one? GZIP - Read is through GZIP codec, but because it is non-splittable, so you can have atmost 1 task reading a gzip file. Now, the content of gzip may be across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie 4 blocks). Assume not all 4 blocks are on same data node. When you start reading the gzip file, 1 task will be assigned. It will read local blocks if available, and it will read remote blocks (streaming read). While reading the stream, gzip codec will uncompress the data. This is really is not a spark thing, but a hadoop input format discussion HTH? On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar wrote: > Hi Ayan, > > we have default rack topology. > > > > -Yeshwanth > Can you Imagine what I would do if I could do all I can - Art of War > > On Tue, Nov 22, 2016 at 6:37 AM, ayan guha wrote: > >> Because snappy is not splittable, so single task makes sense. >> >> Are sure about rack topology? Ie 225 is in a different rack than 227 or >> 228? What does your topology file says? >> On 22 Nov 2016 10:14, "yeshwanth kumar" wrote: >> >>> Thanks for your reply, >>> >>> i can definitely change the underlying compression format. >>> but i am trying to understand the Locality Level, >>> why executor ran on a different node, where the blocks are not present, >>> when Locality Level is RACK_LOCAL >>> >>> can you shed some light on this. >>> >>> >>> Thanks, >>> Yesh >>> >>> >>> -Yeshwanth >>> Can you Imagine what I would do if I could do all I can - Art of War >>> >>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke >>> wrote: >>> Use as a format orc, parquet or avro because they support any compression type with parallel processing. Alternatively split your file in several smaller ones. Another alternative would be bzip2 (but slower in general) or Lzo (usually it is not included by default in many distributions). On 21 Nov 2016, at 23:17, yeshwanth kumar wrote: Hi, we are running Hive on Spark, we have an external table over snappy compressed csv file of size 917.4 M HDFS block size is set to 256 MB as per my Understanding, if i run a query over that external table , it should launch 4 tasks. one for each block. but i am seeing one executor and one task processing all the file. trying to understand the reason behind, i went one step further to understand the block locality when i get the block locations for that file, i found [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1- 4a8f-be48-b0953fdaad37,DISK], DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c- 4eb8-8183-8d8ff5f24115,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030- 43f8-91c9-d8517e68414a,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4 845-b043-8b91ae4017c0,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4 89b-8209-4307f3296211,DISK], DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4 5fd-ae0f-cc6eb268b0d2,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4 601-8070-f6c5da840e09,DISK], DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4 94d-87ee-bcfff2182a96,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4 8d3-b858-a023b5c44e9c,DISK] DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4 98c-a487-5ce6aaa66f48,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4 e20-a360-e7cdad5dacc3,DISK], DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4 c8f-8a13-7be37ce769c9,DISK]] and in the spark UI i see the Locality Level is RACK_LOCAL. for that task if it is RACK_LOCAL then it should run either in node 10.11.0.226 or 10.11.0.228, because these 2 nodes has all the four blocks needed for computation but the executor is running in 10.11.0.225 my theory is not applying anywhere. please help me in understanding how spark/yarn calculates number of executors/tasks. Thanks, -Yeshwanth >>> > -- Best Regards, Ayan Guha
Re: RDD Partitions on HDFS file in Hive on Spark Query
Hi RACK_LOCAL = Task running on the same rack but not on the same node where data is NODE_LOCAL = task and data is co-located. Probably you were looking for this one? GZIP - Read is through GZIP codec, but because it is non-splittable, so you can have atmost 1 task reading a gzip file. Now, the content of gzip may be across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie 4 blocks). Assume not all 4 blocks are on same data node. When you start reading the gzip file, 1 task will be assigned. It will read local blocks if available, and it will read remote blocks (streaming read). While reading the stream, gzip codec will uncompress the data. This is really is not a spark thing, but a hadoop input format discussion HTH? On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumarwrote: > Hi Ayan, > > we have default rack topology. > > > > -Yeshwanth > Can you Imagine what I would do if I could do all I can - Art of War > > On Tue, Nov 22, 2016 at 6:37 AM, ayan guha wrote: > >> Because snappy is not splittable, so single task makes sense. >> >> Are sure about rack topology? Ie 225 is in a different rack than 227 or >> 228? What does your topology file says? >> On 22 Nov 2016 10:14, "yeshwanth kumar" wrote: >> >>> Thanks for your reply, >>> >>> i can definitely change the underlying compression format. >>> but i am trying to understand the Locality Level, >>> why executor ran on a different node, where the blocks are not present, >>> when Locality Level is RACK_LOCAL >>> >>> can you shed some light on this. >>> >>> >>> Thanks, >>> Yesh >>> >>> >>> -Yeshwanth >>> Can you Imagine what I would do if I could do all I can - Art of War >>> >>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke >>> wrote: >>> Use as a format orc, parquet or avro because they support any compression type with parallel processing. Alternatively split your file in several smaller ones. Another alternative would be bzip2 (but slower in general) or Lzo (usually it is not included by default in many distributions). On 21 Nov 2016, at 23:17, yeshwanth kumar wrote: Hi, we are running Hive on Spark, we have an external table over snappy compressed csv file of size 917.4 M HDFS block size is set to 256 MB as per my Understanding, if i run a query over that external table , it should launch 4 tasks. one for each block. but i am seeing one executor and one task processing all the file. trying to understand the reason behind, i went one step further to understand the block locality when i get the block locations for that file, i found [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1- 4a8f-be48-b0953fdaad37,DISK], DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c- 4eb8-8183-8d8ff5f24115,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030- 43f8-91c9-d8517e68414a,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4 845-b043-8b91ae4017c0,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4 89b-8209-4307f3296211,DISK], DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4 5fd-ae0f-cc6eb268b0d2,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4 601-8070-f6c5da840e09,DISK], DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4 94d-87ee-bcfff2182a96,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4 8d3-b858-a023b5c44e9c,DISK] DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4 98c-a487-5ce6aaa66f48,DISK], DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4 e20-a360-e7cdad5dacc3,DISK], DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4 c8f-8a13-7be37ce769c9,DISK]] and in the spark UI i see the Locality Level is RACK_LOCAL. for that task if it is RACK_LOCAL then it should run either in node 10.11.0.226 or 10.11.0.228, because these 2 nodes has all the four blocks needed for computation but the executor is running in 10.11.0.225 my theory is not applying anywhere. please help me in understanding how spark/yarn calculates number of executors/tasks. Thanks, -Yeshwanth >>> > -- Best Regards, Ayan Guha
Re: RDD Partitions on HDFS file in Hive on Spark Query
Hi Ayan, we have default rack topology. -Yeshwanth Can you Imagine what I would do if I could do all I can - Art of War On Tue, Nov 22, 2016 at 6:37 AM, ayan guhawrote: > Because snappy is not splittable, so single task makes sense. > > Are sure about rack topology? Ie 225 is in a different rack than 227 or > 228? What does your topology file says? > On 22 Nov 2016 10:14, "yeshwanth kumar" wrote: > >> Thanks for your reply, >> >> i can definitely change the underlying compression format. >> but i am trying to understand the Locality Level, >> why executor ran on a different node, where the blocks are not present, >> when Locality Level is RACK_LOCAL >> >> can you shed some light on this. >> >> >> Thanks, >> Yesh >> >> >> -Yeshwanth >> Can you Imagine what I would do if I could do all I can - Art of War >> >> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke >> wrote: >> >>> Use as a format orc, parquet or avro because they support any >>> compression type with parallel processing. Alternatively split your file in >>> several smaller ones. Another alternative would be bzip2 (but slower in >>> general) or Lzo (usually it is not included by default in many >>> distributions). >>> >>> On 21 Nov 2016, at 23:17, yeshwanth kumar wrote: >>> >>> Hi, >>> >>> we are running Hive on Spark, we have an external table over snappy >>> compressed csv file of size 917.4 M >>> HDFS block size is set to 256 MB >>> >>> as per my Understanding, if i run a query over that external table , it >>> should launch 4 tasks. one for each block. >>> but i am seeing one executor and one task processing all the file. >>> >>> trying to understand the reason behind, >>> >>> i went one step further to understand the block locality >>> when i get the block locations for that file, i found >>> >>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1- >>> 4a8f-be48-b0953fdaad37,DISK], >>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c- >>> 4eb8-8183-8d8ff5f24115,DISK], >>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030- >>> 43f8-91c9-d8517e68414a,DISK]] >>> >>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4 >>> 845-b043-8b91ae4017c0,DISK], >>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4 >>> 89b-8209-4307f3296211,DISK], >>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4 >>> 5fd-ae0f-cc6eb268b0d2,DISK]] >>> >>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4 >>> 601-8070-f6c5da840e09,DISK], >>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4 >>> 94d-87ee-bcfff2182a96,DISK], >>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4 >>> 8d3-b858-a023b5c44e9c,DISK] >>> >>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4 >>> 98c-a487-5ce6aaa66f48,DISK], >>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4 >>> e20-a360-e7cdad5dacc3,DISK], >>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4 >>> c8f-8a13-7be37ce769c9,DISK]] >>> >>> and in the spark UI i see the Locality Level is RACK_LOCAL. for that >>> task >>> >>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or >>> 10.11.0.228, because these 2 nodes has all the four blocks needed for >>> computation >>> but the executor is running in 10.11.0.225 >>> >>> my theory is not applying anywhere. >>> >>> please help me in understanding how spark/yarn calculates number of >>> executors/tasks. >>> >>> Thanks, >>> -Yeshwanth >>> >>> >>
Re: RDD Partitions on HDFS file in Hive on Spark Query
Because snappy is not splittable, so single task makes sense. Are sure about rack topology? Ie 225 is in a different rack than 227 or 228? What does your topology file says? On 22 Nov 2016 10:14, "yeshwanth kumar"wrote: > Thanks for your reply, > > i can definitely change the underlying compression format. > but i am trying to understand the Locality Level, > why executor ran on a different node, where the blocks are not present, > when Locality Level is RACK_LOCAL > > can you shed some light on this. > > > Thanks, > Yesh > > > -Yeshwanth > Can you Imagine what I would do if I could do all I can - Art of War > > On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke wrote: > >> Use as a format orc, parquet or avro because they support any compression >> type with parallel processing. Alternatively split your file in several >> smaller ones. Another alternative would be bzip2 (but slower in general) or >> Lzo (usually it is not included by default in many distributions). >> >> On 21 Nov 2016, at 23:17, yeshwanth kumar wrote: >> >> Hi, >> >> we are running Hive on Spark, we have an external table over snappy >> compressed csv file of size 917.4 M >> HDFS block size is set to 256 MB >> >> as per my Understanding, if i run a query over that external table , it >> should launch 4 tasks. one for each block. >> but i am seeing one executor and one task processing all the file. >> >> trying to understand the reason behind, >> >> i went one step further to understand the block locality >> when i get the block locations for that file, i found >> >> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1- >> 4a8f-be48-b0953fdaad37,DISK], >> DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c- >> 4eb8-8183-8d8ff5f24115,DISK], >> DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030- >> 43f8-91c9-d8517e68414a,DISK]] >> >> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4 >> 845-b043-8b91ae4017c0,DISK], >> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4 >> 89b-8209-4307f3296211,DISK], >> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4 >> 5fd-ae0f-cc6eb268b0d2,DISK]] >> >> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4 >> 601-8070-f6c5da840e09,DISK], >> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4 >> 94d-87ee-bcfff2182a96,DISK], >> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4 >> 8d3-b858-a023b5c44e9c,DISK] >> >> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4 >> 98c-a487-5ce6aaa66f48,DISK], >> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4 >> e20-a360-e7cdad5dacc3,DISK], >> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4 >> c8f-8a13-7be37ce769c9,DISK]] >> >> and in the spark UI i see the Locality Level is RACK_LOCAL. for that task >> >> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or >> 10.11.0.228, because these 2 nodes has all the four blocks needed for >> computation >> but the executor is running in 10.11.0.225 >> >> my theory is not applying anywhere. >> >> please help me in understanding how spark/yarn calculates number of >> executors/tasks. >> >> Thanks, >> -Yeshwanth >> >> >
Re: RDD Partitions on HDFS file in Hive on Spark Query
Thanks for your reply, i can definitely change the underlying compression format. but i am trying to understand the Locality Level, why executor ran on a different node, where the blocks are not present, when Locality Level is RACK_LOCAL can you shed some light on this. Thanks, Yesh -Yeshwanth Can you Imagine what I would do if I could do all I can - Art of War On Mon, Nov 21, 2016 at 4:59 PM, Jörn Frankewrote: > Use as a format orc, parquet or avro because they support any compression > type with parallel processing. Alternatively split your file in several > smaller ones. Another alternative would be bzip2 (but slower in general) or > Lzo (usually it is not included by default in many distributions). > > On 21 Nov 2016, at 23:17, yeshwanth kumar wrote: > > Hi, > > we are running Hive on Spark, we have an external table over snappy > compressed csv file of size 917.4 M > HDFS block size is set to 256 MB > > as per my Understanding, if i run a query over that external table , it > should launch 4 tasks. one for each block. > but i am seeing one executor and one task processing all the file. > > trying to understand the reason behind, > > i went one step further to understand the block locality > when i get the block locations for that file, i found > > [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d- > 48e1-4a8f-be48-b0953fdaad37,DISK], > DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8- > ce0c-4eb8-8183-8d8ff5f24115,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2- > b030-43f8-91c9-d8517e68414a,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827- > 4845-b043-8b91ae4017c0,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352- > 489b-8209-4307f3296211,DISK], > DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b- > 45fd-ae0f-cc6eb268b0d2,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433- > 4601-8070-f6c5da840e09,DISK], > DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e- > 494d-87ee-bcfff2182a96,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb- > 48d3-b858-a023b5c44e9c,DISK] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd- > 498c-a487-5ce6aaa66f48,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266- > 4e20-a360-e7cdad5dacc3,DISK], > DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f- > 4c8f-8a13-7be37ce769c9,DISK]] > > and in the spark UI i see the Locality Level is RACK_LOCAL. for that task > > if it is RACK_LOCAL then it should run either in node 10.11.0.226 or > 10.11.0.228, because these 2 nodes has all the four blocks needed for > computation > but the executor is running in 10.11.0.225 > > my theory is not applying anywhere. > > please help me in understanding how spark/yarn calculates number of > executors/tasks. > > Thanks, > -Yeshwanth > >
Re: RDD Partitions on HDFS file in Hive on Spark Query
Use as a format orc, parquet or avro because they support any compression type with parallel processing. Alternatively split your file in several smaller ones. Another alternative would be bzip2 (but slower in general) or Lzo (usually it is not included by default in many distributions). > On 21 Nov 2016, at 23:17, yeshwanth kumarwrote: > > Hi, > > we are running Hive on Spark, we have an external table over snappy > compressed csv file of size 917.4 M > HDFS block size is set to 256 MB > > as per my Understanding, if i run a query over that external table , it > should launch 4 tasks. one for each block. > but i am seeing one executor and one task processing all the file. > > trying to understand the reason behind, > > i went one step further to understand the block locality > when i get the block locations for that file, i found > > [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK], > > > DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK], > > > DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK], > > DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK], > DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK] > > DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK], > > DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK], > > DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]] > > and in the spark UI i see the Locality Level is RACK_LOCAL. for that task > > if it is RACK_LOCAL then it should run either in node 10.11.0.226 or > 10.11.0.228, because these 2 nodes has all the four blocks needed for > computation > but the executor is running in 10.11.0.225 > > my theory is not applying anywhere. > > please help me in understanding how spark/yarn calculates number of > executors/tasks. > > Thanks, > -Yeshwanth
Re: RDD Partitions on HDFS file in Hive on Spark Query
Try changing compression to bzip2 or lzo. For reference - http://comphadoop.weebly.com Thanks, Aniket On Mon, Nov 21, 2016, 10:18 PM yeshwanth kumarwrote: > Hi, > > we are running Hive on Spark, we have an external table over snappy > compressed csv file of size 917.4 M > HDFS block size is set to 256 MB > > as per my Understanding, if i run a query over that external table , it > should launch 4 tasks. one for each block. > but i am seeing one executor and one task processing all the file. > > trying to understand the reason behind, > > i went one step further to understand the block locality > when i get the block locations for that file, i found > > [DatanodeInfoWithStorage[10.11.0.226:50010 > ,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK], > DatanodeInfoWithStorage[10.11.0.227:50010 > ,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010 > ,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010 > ,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010 > ,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK], > DatanodeInfoWithStorage[10.11.0.225:50010 > ,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]] > > DatanodeInfoWithStorage[10.11.0.226:50010 > ,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK], > DatanodeInfoWithStorage[10.11.0.227:50010 > ,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010 > ,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK] > > DatanodeInfoWithStorage[10.11.0.226:50010 > ,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK], > DatanodeInfoWithStorage[10.11.0.228:50010 > ,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK], > DatanodeInfoWithStorage[10.11.0.225:50010 > ,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]] > > and in the spark UI i see the Locality Level is RACK_LOCAL. for that task > > if it is RACK_LOCAL then it should run either in node 10.11.0.226 or > 10.11.0.228, because these 2 nodes has all the four blocks needed for > computation > but the executor is running in 10.11.0.225 > > my theory is not applying anywhere. > > please help me in understanding how spark/yarn calculates number of > executors/tasks. > > Thanks, > -Yeshwanth >
RDD Partitions on HDFS file in Hive on Spark Query
Hi, we are running Hive on Spark, we have an external table over snappy compressed csv file of size 917.4 M HDFS block size is set to 256 MB as per my Understanding, if i run a query over that external table , it should launch 4 tasks. one for each block. but i am seeing one executor and one task processing all the file. trying to understand the reason behind, i went one step further to understand the block locality when i get the block locations for that file, i found [DatanodeInfoWithStorage[10.11.0.226:50010 ,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK], DatanodeInfoWithStorage[10.11.0.227:50010 ,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK], DatanodeInfoWithStorage[10.11.0.228:50010 ,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010 ,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK], DatanodeInfoWithStorage[10.11.0.228:50010 ,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK], DatanodeInfoWithStorage[10.11.0.225:50010 ,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]] DatanodeInfoWithStorage[10.11.0.226:50010 ,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK], DatanodeInfoWithStorage[10.11.0.227:50010 ,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK], DatanodeInfoWithStorage[10.11.0.228:50010 ,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK] DatanodeInfoWithStorage[10.11.0.226:50010 ,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK], DatanodeInfoWithStorage[10.11.0.228:50010 ,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK], DatanodeInfoWithStorage[10.11.0.225:50010 ,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]] and in the spark UI i see the Locality Level is RACK_LOCAL. for that task if it is RACK_LOCAL then it should run either in node 10.11.0.226 or 10.11.0.228, because these 2 nodes has all the four blocks needed for computation but the executor is running in 10.11.0.225 my theory is not applying anywhere. please help me in understanding how spark/yarn calculates number of executors/tasks. Thanks, -Yeshwanth
Re: HIVE Query 25x faster than SPARK Query
gt;> On 6/9/2016 3:19 PM, Gavin Yue wrote: >>>> >>>> Could you print out the sql execution plan? My guess is about broadcast >>>> join. >>>> >>>> >>>> >>>> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com> >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>>> here and is there a way we can optimize the queries in SPARK without the >>>> obvious hack in Query2. >>>> >>>> >>>> --- >>>> ENVIRONMENT: >>>> --- >>>> >>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>>> million rows. Both the files are single gzipped csv file. >>>> > Both table A and B are external tables in AWS S3 and created in HIVE >>>> accessed through SPARK using HiveContext >>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>>> allowMaximumResource allocation and node types are c3.4xlarge). >>>> >>>> -- >>>> QUERY1: >>>> -- >>>> select A.PK, B.FK >>>> from A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> >>>> >>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>>> >>>> >>>> -- >>>> QUERY 2: >>>> -- >>>> >>>> select A.PK, B.FK >>>> from (select PK from A) A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> This query takes 4.5 mins in SPARK >>>> >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> >>>> >>>> >>>> >>> >> >
Re: HIVE Query 25x faster than SPARK Query
Hi, We do have a dimension table with around few hundred columns from which we need only a few columns to join with the main fact table which has a few million rows. I do not know how one off this case sounds like but since I have been working in data warehousing it sounds like a fairly general used case. Spark in local mode will be way faster compared to SPARK running on HADOOP. I have a system with 64 GB RAM and SSD and its performance on local cluster SPARK is way better. Did your join include the same number of columns and rows for the dimension table? Regards, Gourav Sengupta On Thu, Jun 16, 2016 at 9:35 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > sounds like this is a one off case. > > Do you have any other use case where you have Hive on MR outperforms Spark? > > I did some tests on 1 billion row table getting the selectivity of a > column using Hive on MR, Hive on Spark engine and Spark running on local > mode (to keep it simple) > > > Hive 2, Spark 1.6.1 > > Results: > > Hive with map-reduce --> 18 minutes > Hive on Spark engine --> 6 minutes > Spark--> 2 minutes > > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 16 June 2016 at 08:43, Jörn Franke <jornfra...@gmail.com> wrote: > >> I agree here. >> >> However it depends always on your use case ! >> >> Best regards >> >> On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com> >> wrote: >> >> Hi Mahender, >> >> please ensure that for dimension tables you are enabling the broadcast >> method. You must be able to see surprising gains @12x. >> >> Overall I think that SPARK cannot figure out whether to scan all the >> columns in a table or just the ones which are being used causing this >> issue. >> >> When you start using HIVE with ORC and TEZ (*) you will see some amazing >> results, and leaves SPARK way way behind. So pretty much you need to have >> your data in memory for matching the performance claims of SPARK and the >> advantage in that case you are getting is not because of SPARK algorithms >> but just fast I/O from RAM. The advantage of SPARK is that it makes >> accessible analytics, querying, and streaming frameworks together. >> >> >> In case you are following the optimisations mentioned in the link you >> hardly have any reasons for using SPARK SQL: >> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And >> imagine being able to do all of that without having machines which requires >> huge RAM, or in short you are achieving those performance gains using >> commodity low cost systems around which HADOOP was designed. >> >> I think that Hortonworks is giving a stiff competition here :) >> >> Regards, >> Gourav Sengupta >> >> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam < >> mahender.bigd...@outlook.com> wrote: >> >>> +1, >>> >>> Even see performance degradation while comparing SPark SQL with Hive. >>> We have table of 260 columns. We have executed in hive and SPARK. In >>> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 >>> mins of time. >>> On 6/9/2016 3:19 PM, Gavin Yue wrote: >>> >>> Could you print out the sql execution plan? My guess is about broadcast >>> join. >>> >>> >>> >>> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com> >>> gourav.sengu...@gmail.com> wrote: >>> >>> Hi, >>> >>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>> here and is there a way we can optimize the queries in SPARK without the >>> obvious hack in Query2. >>> >>> >>> --- >>> ENVIRONMENT: >>> --- >>> >>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>> million rows. Both the files are single gzipped csv file. >>> > Both table A and B are external tables in AWS S3 and created in HIVE >>> accessed through SPARK using HiveContext >>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>> allowMaximumResource allocation and node types are c3.4xlarge). >>> >>> -- >>> QUERY1: >>> -- >>> select A.PK, B.FK >>> from A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> >>> >>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>> >>> >>> -- >>> QUERY 2: >>> -- >>> >>> select A.PK, B.FK >>> from (select PK from A) A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> This query takes 4.5 mins in SPARK >>> >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> >>> >>> >> >
Re: HIVE Query 25x faster than SPARK Query
sounds like this is a one off case. Do you have any other use case where you have Hive on MR outperforms Spark? I did some tests on 1 billion row table getting the selectivity of a column using Hive on MR, Hive on Spark engine and Spark running on local mode (to keep it simple) Hive 2, Spark 1.6.1 Results: Hive with map-reduce --> 18 minutes Hive on Spark engine --> 6 minutes Spark--> 2 minutes HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 16 June 2016 at 08:43, Jörn Franke <jornfra...@gmail.com> wrote: > I agree here. > > However it depends always on your use case ! > > Best regards > > On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi Mahender, > > please ensure that for dimension tables you are enabling the broadcast > method. You must be able to see surprising gains @12x. > > Overall I think that SPARK cannot figure out whether to scan all the > columns in a table or just the ones which are being used causing this > issue. > > When you start using HIVE with ORC and TEZ (*) you will see some amazing > results, and leaves SPARK way way behind. So pretty much you need to have > your data in memory for matching the performance claims of SPARK and the > advantage in that case you are getting is not because of SPARK algorithms > but just fast I/O from RAM. The advantage of SPARK is that it makes > accessible analytics, querying, and streaming frameworks together. > > > In case you are following the optimisations mentioned in the link you > hardly have any reasons for using SPARK SQL: > http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And > imagine being able to do all of that without having machines which requires > huge RAM, or in short you are achieving those performance gains using > commodity low cost systems around which HADOOP was designed. > > I think that Hortonworks is giving a stiff competition here :) > > Regards, > Gourav Sengupta > > On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam < > mahender.bigd...@outlook.com> wrote: > >> +1, >> >> Even see performance degradation while comparing SPark SQL with Hive. >> We have table of 260 columns. We have executed in hive and SPARK. In >> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 >> mins of time. >> On 6/9/2016 3:19 PM, Gavin Yue wrote: >> >> Could you print out the sql execution plan? My guess is about broadcast >> join. >> >> >> >> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com> >> gourav.sengu...@gmail.com> wrote: >> >> Hi, >> >> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here >> and is there a way we can optimize the queries in SPARK without the obvious >> hack in Query2. >> >> >> --- >> ENVIRONMENT: >> --- >> >> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >> million rows. Both the files are single gzipped csv file. >> > Both table A and B are external tables in AWS S3 and created in HIVE >> accessed through SPARK using HiveContext >> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >> allowMaximumResource allocation and node types are c3.4xlarge). >> >> -- >> QUERY1: >> -- >> select A.PK, B.FK >> from A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> >> >> This query takes 4 mins in HIVE and 1.1 hours in SPARK >> >> >> -- >> QUERY 2: >> -- >> >> select A.PK, B.FK >> from (select PK from A) A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> This query takes 4.5 mins in SPARK >> >> >> >> Regards, >> Gourav Sengupta >> >> >> >> >> >
Re: HIVE Query 25x faster than SPARK Query
I agree here. However it depends always on your use case ! Best regards > On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > > Hi Mahender, > > please ensure that for dimension tables you are enabling the broadcast > method. You must be able to see surprising gains @12x. > > Overall I think that SPARK cannot figure out whether to scan all the columns > in a table or just the ones which are being used causing this issue. > > When you start using HIVE with ORC and TEZ (*) you will see some amazing > results, and leaves SPARK way way behind. So pretty much you need to have > your data in memory for matching the performance claims of SPARK and the > advantage in that case you are getting is not because of SPARK algorithms but > just fast I/O from RAM. The advantage of SPARK is that it makes accessible > analytics, querying, and streaming frameworks together. > > > In case you are following the optimisations mentioned in the link you hardly > have any reasons for using SPARK SQL: > http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And > imagine being able to do all of that without having machines which requires > huge RAM, or in short you are achieving those performance gains using > commodity low cost systems around which HADOOP was designed. > > I think that Hortonworks is giving a stiff competition here :) > > Regards, > Gourav Sengupta > >> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam >> <mahender.bigd...@outlook.com> wrote: >> +1, >> >> Even see performance degradation while comparing SPark SQL with Hive. >> We have table of 260 columns. We have executed in hive and SPARK. In Hive, >> it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins >> of time. >>> On 6/9/2016 3:19 PM, Gavin Yue wrote: >>> Could you print out the sql execution plan? My guess is about broadcast >>> join. >>> >>> >>> >>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here >>>> and is there a way we can optimize the queries in SPARK without the >>>> obvious hack in Query2. >>>> >>>> >>>> --- >>>> ENVIRONMENT: >>>> --- >>>> >>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>>> > million rows. Both the files are single gzipped csv file. >>>> > Both table A and B are external tables in AWS S3 and created in HIVE >>>> > accessed through SPARK using HiveContext >>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>>> > allowMaximumResource allocation and node types are c3.4xlarge). >>>> >>>> -- >>>> QUERY1: >>>> -- >>>> select A.PK, B.FK >>>> from A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> >>>> >>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>>> >>>> >>>> -- >>>> QUERY 2: >>>> -- >>>> >>>> select A.PK, B.FK >>>> from (select PK from A) A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> This query takes 4.5 mins in SPARK >>>> >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >
Re: HIVE Query 25x faster than SPARK Query
Hi Mahender, please ensure that for dimension tables you are enabling the broadcast method. You must be able to see surprising gains @12x. Overall I think that SPARK cannot figure out whether to scan all the columns in a table or just the ones which are being used causing this issue. When you start using HIVE with ORC and TEZ (*) you will see some amazing results, and leaves SPARK way way behind. So pretty much you need to have your data in memory for matching the performance claims of SPARK and the advantage in that case you are getting is not because of SPARK algorithms but just fast I/O from RAM. The advantage of SPARK is that it makes accessible analytics, querying, and streaming frameworks together. In case you are following the optimisations mentioned in the link you hardly have any reasons for using SPARK SQL: http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And imagine being able to do all of that without having machines which requires huge RAM, or in short you are achieving those performance gains using commodity low cost systems around which HADOOP was designed. I think that Hortonworks is giving a stiff competition here :) Regards, Gourav Sengupta On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam < mahender.bigd...@outlook.com> wrote: > +1, > > Even see performance degradation while comparing SPark SQL with Hive. > We have table of 260 columns. We have executed in hive and SPARK. In Hive, > it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins > of time. > On 6/9/2016 3:19 PM, Gavin Yue wrote: > > Could you print out the sql execution plan? My guess is about broadcast > join. > > > > On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com> > gourav.sengu...@gmail.com> wrote: > > Hi, > > Query1 is almost 25x faster in HIVE than in SPARK. What is happening here > and is there a way we can optimize the queries in SPARK without the obvious > hack in Query2. > > > --- > ENVIRONMENT: > --- > > > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 > million rows. Both the files are single gzipped csv file. > > Both table A and B are external tables in AWS S3 and created in HIVE > accessed through SPARK using HiveContext > > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using > allowMaximumResource allocation and node types are c3.4xlarge). > > -- > QUERY1: > -- > select A.PK, B.FK > from A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > > > This query takes 4 mins in HIVE and 1.1 hours in SPARK > > > -- > QUERY 2: > -- > > select A.PK, B.FK > from (select PK from A) A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > This query takes 4.5 mins in SPARK > > > > Regards, > Gourav Sengupta > > > > >
Re: HIVE Query 25x faster than SPARK Query
+1, Even see performance degradation while comparing SPark SQL with Hive. We have table of 260 columns. We have executed in hive and SPARK. In Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins of time. On 6/9/2016 3:19 PM, Gavin Yue wrote: Could you print out the sql execution plan? My guess is about broadcast join. On Jun 9, 2016, at 07:14, Gourav Sengupta <<mailto:gourav.sengu...@gmail.com>gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote: Hi, Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and is there a way we can optimize the queries in SPARK without the obvious hack in Query2. --- ENVIRONMENT: --- > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million > rows. Both the files are single gzipped csv file. > Both table A and B are external tables in AWS S3 and created in HIVE accessed > through SPARK using HiveContext > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using > allowMaximumResource allocation and node types are c3.4xlarge). -- QUERY1: -- select A.PK<http://A.PK>, B.FK<http://B.FK> from A left outer join B on (A.PK<http://A.PK> = B.FK<http://B.FK>) where B.FK<http://B.FK> is not null; This query takes 4 mins in HIVE and 1.1 hours in SPARK -- QUERY 2: -- select A.PK<http://A.PK>, B.FK<http://B.FK> from (select PK from A) A left outer join B on (A.PK<http://A.PK> = B.FK<http://B.FK>) where B.FK<http://B.FK> is not null; This query takes 4.5 mins in SPARK Regards, Gourav Sengupta
Re: HIVE Query 25x faster than SPARK Query
Hi Gavin, for the first time someone is responding to this thread with a meaningful conversation - thanks for that. Okay, I did not tweak the spark.sql.autoBroadcastJoinThreshold parameter and since the cached field was around 75 MB therefore I do not think that broadcast join was used. But I will surely be excited to see if I am going wrong here and post the results of sql.describe(). Thanks a ton once again. Hi Ted, Is there anyway you can throw some light on this before I post this in a blog? Regards, Gourav Sengupta On Fri, Jun 10, 2016 at 7:22 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > Yes. because in the second query, you did a (select PK from A) A . I > guess it could the the subquery makes the results much smaller and make > the broadcastJoin, so it is much faster. > > you could use sql.describe() to check the execution plan. > > > On Fri, Jun 10, 2016 at 1:41 AM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi, >> >> I think if we try to see why is Query 2 faster than Query 1 then all the >> answers will be given without beating around the bush. That is the right >> way to find out what is happening and why. >> >> >> Regards, >> Gourav >> >> On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> >> wrote: >> >>> Could you print out the sql execution plan? My guess is about broadcast >>> join. >>> >>> >>> >>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>> here and is there a way we can optimize the queries in SPARK without the >>> obvious hack in Query2. >>> >>> >>> --- >>> ENVIRONMENT: >>> --- >>> >>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>> million rows. Both the files are single gzipped csv file. >>> > Both table A and B are external tables in AWS S3 and created in HIVE >>> accessed through SPARK using HiveContext >>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>> allowMaximumResource allocation and node types are c3.4xlarge). >>> >>> -- >>> QUERY1: >>> -- >>> select A.PK, B.FK >>> from A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> >>> >>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>> >>> >>> -- >>> QUERY 2: >>> -- >>> >>> select A.PK, B.FK >>> from (select PK from A) A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> This query takes 4.5 mins in SPARK >>> >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> >>> >> >
Re: HIVE Query 25x faster than SPARK Query
Yes. because in the second query, you did a (select PK from A) A . I guess it could the the subquery makes the results much smaller and make the broadcastJoin, so it is much faster. you could use sql.describe() to check the execution plan. On Fri, Jun 10, 2016 at 1:41 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > I think if we try to see why is Query 2 faster than Query 1 then all the > answers will be given without beating around the bush. That is the right > way to find out what is happening and why. > > > Regards, > Gourav > > On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > >> Could you print out the sql execution plan? My guess is about broadcast >> join. >> >> >> >> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> >> wrote: >> >> Hi, >> >> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here >> and is there a way we can optimize the queries in SPARK without the obvious >> hack in Query2. >> >> >> --- >> ENVIRONMENT: >> --- >> >> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >> million rows. Both the files are single gzipped csv file. >> > Both table A and B are external tables in AWS S3 and created in HIVE >> accessed through SPARK using HiveContext >> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >> allowMaximumResource allocation and node types are c3.4xlarge). >> >> -- >> QUERY1: >> -- >> select A.PK, B.FK >> from A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> >> >> This query takes 4 mins in HIVE and 1.1 hours in SPARK >> >> >> -- >> QUERY 2: >> -- >> >> select A.PK, B.FK >> from (select PK from A) A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> This query takes 4.5 mins in SPARK >> >> >> >> Regards, >> Gourav Sengupta >> >> >> >> >
Re: HIVE Query 25x faster than SPARK Query
Hi, I think if we try to see why is Query 2 faster than Query 1 then all the answers will be given without beating around the bush. That is the right way to find out what is happening and why. Regards, Gourav On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > Could you print out the sql execution plan? My guess is about broadcast > join. > > > > On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi, > > Query1 is almost 25x faster in HIVE than in SPARK. What is happening here > and is there a way we can optimize the queries in SPARK without the obvious > hack in Query2. > > > --- > ENVIRONMENT: > --- > > > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 > million rows. Both the files are single gzipped csv file. > > Both table A and B are external tables in AWS S3 and created in HIVE > accessed through SPARK using HiveContext > > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using > allowMaximumResource allocation and node types are c3.4xlarge). > > -- > QUERY1: > -- > select A.PK, B.FK > from A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > > > This query takes 4 mins in HIVE and 1.1 hours in SPARK > > > -- > QUERY 2: > -- > > select A.PK, B.FK > from (select PK from A) A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > This query takes 4.5 mins in SPARK > > > > Regards, > Gourav Sengupta > > > >
Re: HIVE Query 25x faster than SPARK Query
Could you print out the sql execution plan? My guess is about broadcast join. > On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > > Hi, > > Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and > is there a way we can optimize the queries in SPARK without the obvious hack > in Query2. > > > --- > ENVIRONMENT: > --- > > > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million > > rows. Both the files are single gzipped csv file. > > Both table A and B are external tables in AWS S3 and created in HIVE > > accessed through SPARK using HiveContext > > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using > > allowMaximumResource allocation and node types are c3.4xlarge). > > -- > QUERY1: > -- > select A.PK, B.FK > from A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > > > This query takes 4 mins in HIVE and 1.1 hours in SPARK > > > -- > QUERY 2: > -- > > select A.PK, B.FK > from (select PK from A) A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > This query takes 4.5 mins in SPARK > > > > Regards, > Gourav Sengupta > > >
Re: HIVE Query 25x faster than SPARK Query
I still fail to see how Hive can do orders of magnitude faster compared to Spark. Assuming that Hive is using map-reduce, I cannot see a real case for Hive to do faster than at least under normal operations Don't take me wrong. I am a fan of Hive. The performance of Hive comes from deploying the execution engine (mr, spark, tez) to do the execution of the work. If we leave that aside for now the other influencing factor would be Hive Optimizer compared to Spark Optimizer. If I go back to thread owner point and quote: "Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and is there a way we can optimize the queries in SPARK without the obvious hack in Query2. Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million rows. Both the files are single gzipped csv file. > Both table A and B are external tables in AWS S3 and created in HIVE accessed through SPARK using HiveContext > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using allowMaximumResource allocation and node types are c3.4xlarge). To take it further and make some reasonable deduction: With gzipped files: 1. Hive will not be able to split the csv files into chunks/blocks and run multiple maps in parallel 2. Spark will give you an RDD with only 1 partition (as of 0.9.0). This is because gzipped files are not splittable If you do not repartition the RDD, any operations on that RDD will be limited to a single core. So with zipped files both Hive and Spark have issues. both tables have not a very large number of rows. With Spark were temporary tables deployed that IMO does help performance. It is possible that Spark has been spilling to disk. We really need the output from GUI jobs, stages and spillage like below to deduce if there was indeed spillage to disk by Spark see (TungstenAggregate) HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 9 June 2016 at 21:40, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Stephen, > > > How can a single gzipped CSV file be partitioned and who partitions tables > based on Primary Key in Hive? If you read the environments section you > will be able to see that all the required details are mentioned. > > As far as I understand that Hive does work 25x faster (in these particular > cases) and around 100x faster (when we are using TEZ) when compared to > SPARK. > > It will be interesting to see if Ted includes these findings while they > are benchmarking SPARK. This is a very typical and a general used case. > > > Regards, > Gourav > > On Thu, Jun 9, 2016 at 5:11 PM, Stephen Boesch <java...@gmail.com> wrote: > >> ooc are the tables partitioned on a.pk and b.fk? Hive might be using >> copartitioning in that case: it is one of hive's strengths. >> >> 2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>: >> >>> Hi Mich, >>> >>> does not Hive use map-reduce? I thought it to be so. And since I am >>> running the queries in EMR 4.6 therefore HIVE is not using TEZ. >>> >>> >>> Regards, >>> Gourav >>> >>> On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> are you using map-reduce with Hive? >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>>>> here and is there a way we can optimize the queries in SPARK without the >>>>> obvious hack in Query2. >>>>> >>>>> >>>>> --- >>>>> ENVIRONMENT: >>>>> --- >>>>> >>>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>>>> million rows. Both the files are single gzipped csv file. >>>>> > Both table A and B are external tables in AWS S3 and cre
Re: HIVE Query 25x faster than SPARK Query
Hi Stephen, How can a single gzipped CSV file be partitioned and who partitions tables based on Primary Key in Hive? If you read the environments section you will be able to see that all the required details are mentioned. As far as I understand that Hive does work 25x faster (in these particular cases) and around 100x faster (when we are using TEZ) when compared to SPARK. It will be interesting to see if Ted includes these findings while they are benchmarking SPARK. This is a very typical and a general used case. Regards, Gourav On Thu, Jun 9, 2016 at 5:11 PM, Stephen Boesch <java...@gmail.com> wrote: > ooc are the tables partitioned on a.pk and b.fk? Hive might be using > copartitioning in that case: it is one of hive's strengths. > > 2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>: > >> Hi Mich, >> >> does not Hive use map-reduce? I thought it to be so. And since I am >> running the queries in EMR 4.6 therefore HIVE is not using TEZ. >> >> >> Regards, >> Gourav >> >> On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> are you using map-reduce with Hive? >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>>> here and is there a way we can optimize the queries in SPARK without the >>>> obvious hack in Query2. >>>> >>>> >>>> --- >>>> ENVIRONMENT: >>>> --- >>>> >>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>>> million rows. Both the files are single gzipped csv file. >>>> > Both table A and B are external tables in AWS S3 and created in HIVE >>>> accessed through SPARK using HiveContext >>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>>> allowMaximumResource allocation and node types are c3.4xlarge). >>>> >>>> -- >>>> QUERY1: >>>> -- >>>> select A.PK, B.FK >>>> from A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> >>>> >>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>>> >>>> >>>> -- >>>> QUERY 2: >>>> -- >>>> >>>> select A.PK, B.FK >>>> from (select PK from A) A >>>> left outer join B on (A.PK = B.FK) >>>> where B.FK is not null; >>>> >>>> This query takes 4.5 mins in SPARK >>>> >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> >>>> >>>> >>> >> >
Re: HIVE Query 25x faster than SPARK Query
ooc are the tables partitioned on a.pk and b.fk? Hive might be using copartitioning in that case: it is one of hive's strengths. 2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>: > Hi Mich, > > does not Hive use map-reduce? I thought it to be so. And since I am > running the queries in EMR 4.6 therefore HIVE is not using TEZ. > > > Regards, > Gourav > > On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> are you using map-reduce with Hive? >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening >>> here and is there a way we can optimize the queries in SPARK without the >>> obvious hack in Query2. >>> >>> >>> --- >>> ENVIRONMENT: >>> --- >>> >>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >>> million rows. Both the files are single gzipped csv file. >>> > Both table A and B are external tables in AWS S3 and created in HIVE >>> accessed through SPARK using HiveContext >>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >>> allowMaximumResource allocation and node types are c3.4xlarge). >>> >>> -- >>> QUERY1: >>> -- >>> select A.PK, B.FK >>> from A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> >>> >>> This query takes 4 mins in HIVE and 1.1 hours in SPARK >>> >>> >>> -- >>> QUERY 2: >>> -- >>> >>> select A.PK, B.FK >>> from (select PK from A) A >>> left outer join B on (A.PK = B.FK) >>> where B.FK is not null; >>> >>> This query takes 4.5 mins in SPARK >>> >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> >>> >> >
Re: HIVE Query 25x faster than SPARK Query
Hi Mich, does not Hive use map-reduce? I thought it to be so. And since I am running the queries in EMR 4.6 therefore HIVE is not using TEZ. Regards, Gourav On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > are you using map-reduce with Hive? > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > >> Hi, >> >> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here >> and is there a way we can optimize the queries in SPARK without the obvious >> hack in Query2. >> >> >> --- >> ENVIRONMENT: >> --- >> >> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 >> million rows. Both the files are single gzipped csv file. >> > Both table A and B are external tables in AWS S3 and created in HIVE >> accessed through SPARK using HiveContext >> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using >> allowMaximumResource allocation and node types are c3.4xlarge). >> >> ------ >> QUERY1: >> -- >> select A.PK, B.FK >> from A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> >> >> This query takes 4 mins in HIVE and 1.1 hours in SPARK >> >> >> -- >> QUERY 2: >> -- >> >> select A.PK, B.FK >> from (select PK from A) A >> left outer join B on (A.PK = B.FK) >> where B.FK is not null; >> >> This query takes 4.5 mins in SPARK >> >> >> >> Regards, >> Gourav Sengupta >> >> >> >> >
Re: HIVE Query 25x faster than SPARK Query
are you using map-reduce with Hive? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > Query1 is almost 25x faster in HIVE than in SPARK. What is happening here > and is there a way we can optimize the queries in SPARK without the obvious > hack in Query2. > > > --- > ENVIRONMENT: > --- > > > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 > million rows. Both the files are single gzipped csv file. > > Both table A and B are external tables in AWS S3 and created in HIVE > accessed through SPARK using HiveContext > > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using > allowMaximumResource allocation and node types are c3.4xlarge). > > -- > QUERY1: > -- > select A.PK, B.FK > from A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > > > This query takes 4 mins in HIVE and 1.1 hours in SPARK > > > -- > QUERY 2: > -- > > select A.PK, B.FK > from (select PK from A) A > left outer join B on (A.PK = B.FK) > where B.FK is not null; > > This query takes 4.5 mins in SPARK > > > > Regards, > Gourav Sengupta > > > >
HIVE Query 25x faster than SPARK Query
Hi, Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and is there a way we can optimize the queries in SPARK without the obvious hack in Query2. --- ENVIRONMENT: --- > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million rows. Both the files are single gzipped csv file. > Both table A and B are external tables in AWS S3 and created in HIVE accessed through SPARK using HiveContext > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using allowMaximumResource allocation and node types are c3.4xlarge). -- QUERY1: -- select A.PK, B.FK from A left outer join B on (A.PK = B.FK) where B.FK is not null; This query takes 4 mins in HIVE and 1.1 hours in SPARK ------ QUERY 2: -- select A.PK, B.FK from (select PK from A) A left outer join B on (A.PK = B.FK) where B.FK is not null; This query takes 4.5 mins in SPARK Regards, Gourav Sengupta
Re: hive on spark query error
Seems like you have "hive.server2.enable.doAs" enabled; you can either disable it, or configure hs2 so that the user running the service ("hadoop" in your case) can impersonate others. See: https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-common/Superusers.html On Fri, Sep 25, 2015 at 10:33 AM, Garry Chenwrote: > 2015-09-25 13:31:16,245 INFO [stderr-redir-1]: client.SparkClientImpl > (SparkClientImpl.java:run(569)) - ERROR: > org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is > not allowed to impersonate HIVEAPP -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
hive on spark query error
Hi All, I am following https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started? to setup hive on spark. After setup/configuration everything startup I am able to show tables but when executing sql statement within beeline I got error. Please help and thank you very much. Cluster Environment (3 nodes) as following hadoop-2.7.1 spark-1.4.1-bin-hadoop2.6 zookeeper-3.4.6 apache-hive-1.2.1-bin Error from hive log: 2015-09-25 11:51:03,123 INFO [HiveServer2-Handler-Pool: Thread-50]: client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting impersonation of oracle 2015-09-25 11:51:03,133 INFO [HiveServer2-Handler-Pool: Thread-50]: client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit --proxy-user oracle --properties-file /tmp/spark-submit.840692098393819749.properties --class org.apache.hive.spark.client.RemoteDriver /u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host ip-10-92-82-229.ec2.internal --remote-port 40476 --conf hive.spark.client.connect.timeout=1000 --conf hive.spark.client.server.connect.timeout=9 --conf hive.spark.client.channel.log.level=null --conf hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 --conf hive.spark.client.secret.bits=256 2015-09-25 11:51:03,867 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.server.connect.timeout=9 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.threads=8 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.connect.timeout=1000 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.secret.bits=256 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.max.size=52428800 2015-09-25 11:51:03,876 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, mesos, or local 2015-09-25 11:51:03,876 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose for debug output 2015-09-25 11:51:03,885 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown hook called 2015-09-25 11:51:03,889 WARN [Driver]: client.SparkClientImpl (SparkClientImpl.java:run(427)) - Child process exited with code 1.
RE: hive on spark query error
Yes you are right. Make the change and also link hive-site.xml into spark conf directory. Rerun the sql getting error in hive.log 2015-09-25 13:31:14,750 INFO [HiveServer2-Handler-Pool: Thread-125]: client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting impersonation of HIVEAPP 2015-09-25 13:31:14,750 INFO [HiveServer2-Handler-Pool: Thread-125]: client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit --executor-memory 512m --proxy-user HIVEAPP --properties-file /tmp/spark-submit.4348738410387344124.properties --class org.apache.hive.spark.client.RemoteDriver /u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host ip-10-92-82-229.ec2.internal --remote-port 48481 --conf hive.spark.client.connect.timeout=1000 --conf hive.spark.client.server.connect.timeout=9 --conf hive.spark.client.channel.log.level=null --conf hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 --conf hive.spark.client.secret.bits=256 2015-09-25 13:31:15,473 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.server.connect.timeout=9 2015-09-25 13:31:15,473 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.threads=8 2015-09-25 13:31:15,474 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.connect.timeout=1000 2015-09-25 13:31:15,474 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.secret.bits=256 2015-09-25 13:31:15,474 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.max.size=52428800 2015-09-25 13:31:15,718 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - 15/09/25 13:31:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-09-25 13:31:16,063 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 2015-09-25 13:31:16,245 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - ERROR: org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is not allowed to impersonate HIVEAPP 2015-09-25 13:31:16,248 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO util.Utils: Shutdown hook called 2015-09-25 13:31:16,265 WARN [Driver]: client.SparkClientImpl (SparkClientImpl.java:run(427)) - Child process exited with code 1. -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Friday, September 25, 2015 1:12 PM To: Garry Chen <g...@cornell.edu> Cc: Jimmy Xiang <jxi...@cloudera.com>; user@spark.apache.org Subject: Re: hive on spark query error On Fri, Sep 25, 2015 at 10:05 AM, Garry Chen <g...@cornell.edu> wrote: > In spark-defaults.conf the spark.master is spark://hostname:7077. > From hive-site.xml > spark.master > hostname > That's not a valid value for spark.master (as the error indicates). You should set it to "spark://hostname:7077", as you have it in spark-defaults.conf (or perhaps remove the setting from hive-site.xml, I think hive will honor your spark-defaults.conf). -- Marcelo
RE: hive on spark query error
In spark-defaults.conf the spark.master is spark://hostname:7077. From hive-site.xml spark.master hostname From: Jimmy Xiang [mailto:jxi...@cloudera.com] Sent: Friday, September 25, 2015 1:00 PM To: Garry Chen <g...@cornell.edu> Cc: user@spark.apache.org Subject: Re: hive on spark query error > Error: Master must start with yarn, spark, mesos, or local What's your setting for spark.master? On Fri, Sep 25, 2015 at 9:56 AM, Garry Chen <g...@cornell.edu<mailto:g...@cornell.edu>> wrote: Hi All, I am following https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started? to setup hive on spark. After setup/configuration everything startup I am able to show tables but when executing sql statement within beeline I got error. Please help and thank you very much. Cluster Environment (3 nodes) as following hadoop-2.7.1 spark-1.4.1-bin-hadoop2.6 zookeeper-3.4.6 apache-hive-1.2.1-bin Error from hive log: 2015-09-25 11:51:03,123 INFO [HiveServer2-Handler-Pool: Thread-50]: client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting impersonation of oracle 2015-09-25 11:51:03,133 INFO [HiveServer2-Handler-Pool: Thread-50]: client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit --proxy-user oracle --properties-file /tmp/spark-submit.840692098393819749.properties --class org.apache.hive.spark.client.RemoteDriver /u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host ip-10-92-82-229.ec2.internal --remote-port 40476 --conf hive.spark.client.connect.timeout=1000 --conf hive.spark.client.server.connect.timeout=9 --conf hive.spark.client.channel.log.level=null --conf hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 --conf hive.spark.client.secret.bits=256 2015-09-25 11:51:03,867 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.server.connect.timeout=9 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.threads=8 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.connect.timeout=1000 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.secret.bits=256 2015-09-25 11:51:03,868 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: hive.spark.client.rpc.max.size=52428800 2015-09-25 11:51:03,876 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, mesos, or local 2015-09-25 11:51:03,876 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose for debug output 2015-09-25 11:51:03,885 INFO [stderr-redir-1]: client.SparkClientImpl (SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown hook called 2015-09-25 11:51:03,889 WARN [Driver]: client.SparkClientImpl (SparkClientImpl.java:run(427)) - Child process exited with code 1.
Re: hive on spark query error
On Fri, Sep 25, 2015 at 10:05 AM, Garry Chenwrote: > In spark-defaults.conf the spark.master is spark://hostname:7077. From > hive-site.xml > spark.master > hostname > That's not a valid value for spark.master (as the error indicates). You should set it to "spark://hostname:7077", as you have it in spark-defaults.conf (or perhaps remove the setting from hive-site.xml, I think hive will honor your spark-defaults.conf). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark query
try the spark-datetime package: https://github.com/SparklineData/spark-datetime Follow this example https://github.com/SparklineData/spark-datetime#a-basic-example to get the different attributes of a DateTime. On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote: As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs, please take a look below builtin UDFs of Hive, get day of year should be as simply as existing RDBMS https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote: Hi everyone, I can't get 'day of year' when using spark query. Can you help any way to achieve day of year? Regards, Ravi
Spark query
Hi everyone, I can't get 'day of year' when using spark query. Can you help any way to achieve day of year? Regards, Ravi
Re: Spark query
Convert the column to a column of java Timestamps. Then you can do the following import java.sql.Timestamp import java.util.Calendar def date_trunc(timestamp:Timestamp, timeField:String) = { timeField match { case hour = val cal = Calendar.getInstance() cal.setTimeInMillis(timestamp.getTime()) cal.get(Calendar.HOUR_OF_DAY) case day = val cal = Calendar.getInstance() cal.setTimeInMillis(timestamp.getTime()) cal.get(Calendar.DAY) } } sqlContext.udf.register(date_trunc, date_trunc _) On Wed, Jul 8, 2015 at 9:23 PM, Harish Butani rhbutani.sp...@gmail.com wrote: try the spark-datetime package: https://github.com/SparklineData/spark-datetime Follow this example https://github.com/SparklineData/spark-datetime#a-basic-example to get the different attributes of a DateTime. On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote: As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs, please take a look below builtin UDFs of Hive, get day of year should be as simply as existing RDBMS https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote: Hi everyone, I can't get 'day of year' when using spark query. Can you help any way to achieve day of year? Regards, Ravi