Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread fwy
Thanks to all for the quick replies, they helped a lot.  To answer a few of
the follow-up questions ...

> 1. How did you fix this performance which I gather programmatically

The main problem in my original code was that the  logic was not being executed when it should have been.  This
'if' clause has the effect of creating a new data frame directly from the
contents of the Cassandra data source, throwing away the old RDD lineage of
the existing data frame.  By incorrectly executing the 'else' clause
instead, the new data frame was being created from the existing data frame
again and again, extending the entire lineage with each iteration and
degrading query times.  Fixing the code stopped the lineage from constantly
growing with each refresh.

> 2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled",
> "true")

Yes, this is already set.

> 3. Depending on the size of your data both source and new data, do you
> have any indication that your data in global temporary view is totally
> cached. This should show in the storage tab in UI. If you have data on the
> disk for then this will affect the performance 

Yes, the cached temporary view is in the storage tab with the expected
amount of memory usage, although it is difficult to check the specific
contents of this memory cache.


> You could try checkpointing occasionally and see if that helps

This was very helpful, as I was not familiar with the Spark checkpointing
feature.  In our application, we don't need fault-tolerant dataframes/RDDs,
so I inserted localCheckpoint() calls in the code prior to the uncaching and
re-caching steps.  This seems to have made all queries run faster, with some
improved immediately by a factor of 3!


Based on these tests, it is clear that RDD lineage can have a major effect
on the performance of Spark applications, whether or not the data has been
cached in memory.

Thanks again for the good advice!





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread Sebastian Piu
> Do Spark SQL queries depend directly on the RDD lineage even when the
final results have been cached?

Yes, if one of the nodes holding cached data later fails spark would need
to rebuild that state somehow.

You could try checkpointing occasionally and see if that helps

On Sat, 22 May 2021, 11:44 pm Fred Yeadon,  wrote:

> Hi all,
>
>
> Working on a complex Spark 3.0.1 application, I noticed some unexpected
> Spark behavior recently that I am hoping someone can explain.  The
> application is Java with many large classes, but I have tried to describe
> the essential logic below.
>
> During periodic refresh runs, the application extracts data from Cassandra
> database tables, filters and combines them into Spark data frames that are
> cached and registered as views in the global temporary DB.  The data frames
> that were previously there are uncached and replaced by the new ones.
> Remote clients can then query the views through Spark SQL.
>
> This refresh process runs in multiple threads that build each new Spark
> data frame progressively, reading one Cassandra table, filtering out the
> old contents of the view and adding the new Cassandra contents with a union
> operation.  Each thread un-caches the old data frame and caches the new
> one, then runs a count() action to realize the previous transformations.
> Below is some pseudo-code for this multi-threaded logic.
>
>
>
> *
>
> // Read Cassandra table using Spark Cassandra Connector
>
> Dataset data =
> sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();
>
>
> // Combine data into single Spark view
>
> Dataset combinedView = null;
>
> String combinedViewName = "myView";
>
> if () {
>
>// Start a new combined view from the contents of the source table
>
>combinedView = data;
>
> } else {
>
>// Read the existing combined view to further extend it
>
>combinedView = sparkSession.table(combinedViewName);
>
>…
>
>// Remove stale data with filter
>
>combinedView = combinedView.filter();
>
>…
>
>// Add new data
>
>combinedView = combinedView.union(data);
>
> }
>
>
> // Re-cache modified combined view
>
> sparkSession.catalog().uncacheTable(combinedViewName);
>
> combinedView.createOrReplaceGlobalTempView(combinedViewName);
>
> sparkSession.catalog().cacheTable(combinedViewName);
>
> combinedView.count();
>
>
>
> *
>
> The application works, but I recently fixed a bug where Spark SQL queries
> were running incrementally slower after each refresh, resulting in steady
> performance degradation.  After investigation, I found that the  through the data frame> check logic above was not correct, causing a new
> combined data frame to build on the lineage of the old data frame RDD that
> it is replacing.  The Spark physical plan of many queries was becoming
> larger and larger because of 'filter' and 'union' transformations being
> added to the same data frame.  I am not yet very familiar with Spark query
> plans, but below are fragments of a physical plan before and after a
> refresh that highlight the differences.
>
> Before refresh
>
> ===
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>…
>
>  +-*(1) Project …
>
> +- *(1) Filter …
>
> +- Scan In-memory table ….
>
> +- InMemoryRelation …
>
> +- Exchange RoundRobinPartitioning(2), …
>
>+- Union
>
>:- Exchange RoundRobinPartitioning(2), …
>
>:   +- Union
>
>:   :- *(1) Project …
>
>:   :  +- *(1) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(2) Project …
>
>:   :  +- *(2) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(3) Project …
>
>:   :  +- *(3) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
> 

Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread Mich Talebzadeh
Hi Fred,

You said you managed to fix the problem somehow and have attributed some
issues with RDD lineage. Few things come to my mind:


   1. How did you fix this performance which I gather programmatically
   2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled",
   "true")
   3. Depending on the size of your data both source and new data, do you
   have any indication that your data in global temporary view is totally
   cached. This should show in the storage tab in UI. If you have data on the
   disk for then this will affect the performance
   4. What is the output of print(rdd.toDebugString())


[image: image.png]


I doubt this issue is caused by RDD lineage by adding additional steps not
required.

HTH

Mich


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 22 May 2021 at 23:44, Fred Yeadon  wrote:

> Hi all,
>
>
> Working on a complex Spark 3.0.1 application, I noticed some unexpected
> Spark behavior recently that I am hoping someone can explain.  The
> application is Java with many large classes, but I have tried to describe
> the essential logic below.
>
> During periodic refresh runs, the application extracts data from Cassandra
> database tables, filters and combines them into Spark data frames that are
> cached and registered as views in the global temporary DB.  The data frames
> that were previously there are uncached and replaced by the new ones.
> Remote clients can then query the views through Spark SQL.
>
> This refresh process runs in multiple threads that build each new Spark
> data frame progressively, reading one Cassandra table, filtering out the
> old contents of the view and adding the new Cassandra contents with a union
> operation.  Each thread un-caches the old data frame and caches the new
> one, then runs a count() action to realize the previous transformations.
> Below is some pseudo-code for this multi-threaded logic.
>
>
>
> *
>
> // Read Cassandra table using Spark Cassandra Connector
>
> Dataset data =
> sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();
>
>
> // Combine data into single Spark view
>
> Dataset combinedView = null;
>
> String combinedViewName = "myView";
>
> if () {
>
>// Start a new combined view from the contents of the source table
>
>combinedView = data;
>
> } else {
>
>// Read the existing combined view to further extend it
>
>combinedView = sparkSession.table(combinedViewName);
>
>…
>
>// Remove stale data with filter
>
>combinedView = combinedView.filter();
>
>…
>
>// Add new data
>
>combinedView = combinedView.union(data);
>
> }
>
>
> // Re-cache modified combined view
>
> sparkSession.catalog().uncacheTable(combinedViewName);
>
> combinedView.createOrReplaceGlobalTempView(combinedViewName);
>
> sparkSession.catalog().cacheTable(combinedViewName);
>
> combinedView.count();
>
>
>
> *
>
> The application works, but I recently fixed a bug where Spark SQL queries
> were running incrementally slower after each refresh, resulting in steady
> performance degradation.  After investigation, I found that the  through the data frame> check logic above was not correct, causing a new
> combined data frame to build on the lineage of the old data frame RDD that
> it is replacing.  The Spark physical plan of many queries was becoming
> larger and larger because of 'filter' and 'union' transformations being
> added to the same data frame.  I am not yet very familiar with Spark query
> plans, but below are fragments of a physical plan before and after a
> refresh that highlight the differences.
>
> Before refresh
>
> ===
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>…
>
>  +-*(1) Project …
>
> +- *(1) Filter …
>
> +- Scan In-memory table ….
>
> +- InMemoryRelation …
>
> +- Exchange RoundRobinPartitioning(2), …
>
>+- Union
>
>:- Ex

Spark query performance of cached data affected by RDD lineage

2021-05-22 Thread Fred Yeadon
Hi all,


Working on a complex Spark 3.0.1 application, I noticed some unexpected
Spark behavior recently that I am hoping someone can explain.  The
application is Java with many large classes, but I have tried to describe
the essential logic below.

During periodic refresh runs, the application extracts data from Cassandra
database tables, filters and combines them into Spark data frames that are
cached and registered as views in the global temporary DB.  The data frames
that were previously there are uncached and replaced by the new ones.
Remote clients can then query the views through Spark SQL.

This refresh process runs in multiple threads that build each new Spark
data frame progressively, reading one Cassandra table, filtering out the
old contents of the view and adding the new Cassandra contents with a union
operation.  Each thread un-caches the old data frame and caches the new
one, then runs a count() action to realize the previous transformations.
Below is some pseudo-code for this multi-threaded logic.


*

// Read Cassandra table using Spark Cassandra Connector

Dataset data =
sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();


// Combine data into single Spark view

Dataset combinedView = null;

String combinedViewName = "myView";

if () {

   // Start a new combined view from the contents of the source table

   combinedView = data;

} else {

   // Read the existing combined view to further extend it

   combinedView = sparkSession.table(combinedViewName);

   …

   // Remove stale data with filter

   combinedView = combinedView.filter();

   …

   // Add new data

   combinedView = combinedView.union(data);

}


// Re-cache modified combined view

sparkSession.catalog().uncacheTable(combinedViewName);

combinedView.createOrReplaceGlobalTempView(combinedViewName);

sparkSession.catalog().cacheTable(combinedViewName);

combinedView.count();


*

The application works, but I recently fixed a bug where Spark SQL queries
were running incrementally slower after each refresh, resulting in steady
performance degradation.  After investigation, I found that the  check logic above was not correct, causing a new
combined data frame to build on the lineage of the old data frame RDD that
it is replacing.  The Spark physical plan of many queries was becoming
larger and larger because of 'filter' and 'union' transformations being
added to the same data frame.  I am not yet very familiar with Spark query
plans, but below are fragments of a physical plan before and after a
refresh that highlight the differences.

Before refresh

===

== Physical Plan ==

AdaptiveSparkPlan isFinalPlan=true

+- *(8) Project…

   …

 +-*(1) Project …

+- *(1) Filter …

+- Scan In-memory table ….

+- InMemoryRelation …

+- Exchange RoundRobinPartitioning(2), …

   +- Union

   :- Exchange RoundRobinPartitioning(2), …

   :   +- Union

   :   :- *(1) Project …

   :   :  +- *(1) Filter …

   :  +- BatchScan … Cassandra
Scan: 

   ...

   :   :- *(2) Project …

   :   :  +- *(2) Filter …

   :  +- BatchScan … Cassandra
Scan: 

   ...

   :   :- *(3) Project …

   :   :  +- *(3) Filter …

   :  +- BatchScan … Cassandra
Scan: 

   ...

   :   :- *(8) Project …

   :   :  +- *(4) Filter …

   :  +- BatchScan … Cassandra
Scan: 

   ...


After refresh

==


== Physical Plan ==

AdaptiveSparkPlan isFinalPlan=true

+- *(8) Project…

   …

 +-*(1) Project …

+- *(1) Filter …

+- Scan In-memory table ….

+- InMemoryRelation …

+- Exchange RoundRobinPartitioning(2), …

   +- Union

   :- Exchange RoundRobinPartitioning(2), …

   :   +- Union

NEW LINE --->   :- Exchange RoundRobinPartitioning(2), …

NEW LINE ---> :   +- Union

  :   :- *(1) Project …

  :   :  +- *(1) Filter …

  :

Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Russell Spitzer
scala> def getRootRdd( rdd:RDD[_] ): RDD[_]  = { if (rdd.dependencies.size == 
0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]

scala> val rdd = spark.read.parquet("/Users/russellspitzer/Temp/local").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] 
at rdd at :24

scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at :24

scala> scan.partitions.map(scan.preferredLocations)
res8: Array[Seq[String]] = Array(WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray())

I define a quick traversal to get the source RDD for the dataframe operation. I 
make the read datafrarne and get the RDD out of it. I traverse the RDD's 
dependencies to get the FileScan. I then apply the scan's preferredLocations 
methods to each partition. You can see the result here is that none of my 
partitions have a preferred location so they will all be run at "Any". This is 
because I'm using my local file system which never reports a preferred location 
so even though the scheduler will report "ANY" in this case they are actually 
node local.


> On Apr 13, 2021, at 8:37 AM, Mohamadreza Rostami 
>  wrote:
> 
> Thanks for your response.
> I think my HDFS-spark cluster is co-localized because I have a spark worker 
> per each datanode; in other words, I installed the spark workers on 
> datanodes, and I think that's the point that why this simple query on a 
> co-localized HDFS-spark cluster run in "Any" locality level?
> Is there any way to figure out which IP or hostname of data-nodes returns 
> from name-node to the spark? or Can you offer me a debug approach?
> 
>> On Farvardin 24, 1400 AP, at 17:45, Russell Spitzer 
>> mailto:russell.spit...@gmail.com>> wrote:
>> 
>> Data locality can only occur if the Spark Executor IP address string matches 
>> the preferred location returned by the file system. So this job would only 
>> have local tasks if the datanode replicas for the files in question had the 
>> same ip address as the Spark executors you are using. If they don't then the 
>> scheduler falls back to assigning read tasks to the first executor available 
>> with locality level "any". 
>> 
>> So unless you have that HDFS - Spark Cluster co-localization I wouldn't 
>> expect this job to run at any other locality level than ANY.
>> 
>>> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami 
>>> mailto:mohamadrezarosta...@gmail.com>> 
>>> wrote:
>>> 
>>> I have a Hadoop cluster that uses Apache Spark to query parquet files saved 
>>> on Hadoop. For example, when i'm using the following PySpark code to find a 
>>> word in parquet files:
>>> df = spark.read.parquet("hdfs://test/parquets/* ")
>>> df.filter(df['word'] == "jhon").show()
>>> After running this code, I go to spark application UI, stages tab, I see 
>>> that locality level summery set on Any. In contrast, because of this 
>>> query's nature, it must run locally and on NODE_LOCAL locality level at 
>>> least. When I check the network IO of the cluster while running this, I 
>>> find out that this query use network (network IO increases while the query 
>>> is running). The strange part of this situation is that the number shown in 
>>> the spark UI's shuffle section is very small.
>>> How can I find out the root cause of this problem and solve that?
>>> link of stackoverflow.com <http://stackoverflow.com/> : 
>>> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
>>>  
>>> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>
> 



Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Russell Spitzer
Data locality can only occur if the Spark Executor IP address string matches 
the preferred location returned by the file system. So this job would only have 
local tasks if the datanode replicas for the files in question had the same ip 
address as the Spark executors you are using. If they don't then the scheduler 
falls back to assigning read tasks to the first executor available with 
locality level "any". 

So unless you have that HDFS - Spark Cluster co-localization I wouldn't expect 
this job to run at any other locality level than ANY.

> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami 
>  wrote:
> 
> I have a Hadoop cluster that uses Apache Spark to query parquet files saved 
> on Hadoop. For example, when i'm using the following PySpark code to find a 
> word in parquet files:
> df = spark.read.parquet("hdfs://test/parquets/* ")
> df.filter(df['word'] == "jhon").show()
> After running this code, I go to spark application UI, stages tab, I see that 
> locality level summery set on Any. In contrast, because of this query's 
> nature, it must run locally and on NODE_LOCAL locality level at least. When I 
> check the network IO of the cluster while running this, I find out that this 
> query use network (network IO increases while the query is running). The 
> strange part of this situation is that the number shown in the spark UI's 
> shuffle section is very small.
> How can I find out the root cause of this problem and solve that?
> link of stackoverflow.com <http://stackoverflow.com/> : 
> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
>  
> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>


[Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Mohamadreza Rostami
I have a Hadoop cluster that uses Apache Spark to query parquet files saved on 
Hadoop. For example, when i'm using the following PySpark code to find a word 
in parquet files:
df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()
After running this code, I go to spark application UI, stages tab, I see that 
locality level summery set on Any. In contrast, because of this query's nature, 
it must run locally and on NODE_LOCAL locality level at least. When I check the 
network IO of the cluster while running this, I find out that this query use 
network (network IO increases while the query is running). The strange part of 
this situation is that the number shown in the spark UI's shuffle section is 
very small.
How can I find out the root cause of this problem and solve that?
link of stackoverflow.com : 
https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
 
<https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>

Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

, thanks for the explanation,
I am aware of compression codecs.

How does locality level set?
Is it done by Spark or yarn?

Please let me know,



Thanks,
Yesh




On Nov 22, 2016 5:13 PM, "ayan guha"  wrote:

Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar 
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>>> wrote:
>>>
 Use as a format orc, parquet or avro because they support any
 compression type with parallel processing. Alternatively split your file in
 several smaller ones. Another alternative would be bzip2 (but slower in
 general) or Lzo (usually it is not included by default in many
 distributions).

 On 21 Nov 2016, at 23:17, yeshwanth kumar 
 wrote:

 Hi,

 we are running Hive on Spark, we have an external table over snappy
 compressed csv file of size 917.4 M
 HDFS block size is set to 256 MB

 as per my Understanding, if i run a query over that external table , it
 should launch 4 tasks. one for each block.
 but i am seeing one executor and one task processing all the file.

 trying to understand the reason behind,

 i went one step further to understand the block locality
 when i get the block locations for that file, i found

 [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
 4a8f-be48-b0953fdaad37,DISK],
  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
 4eb8-8183-8d8ff5f24115,DISK],
  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
 43f8-91c9-d8517e68414a,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
 845-b043-8b91ae4017c0,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
 89b-8209-4307f3296211,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
 5fd-ae0f-cc6eb268b0d2,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
 601-8070-f6c5da840e09,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
 94d-87ee-bcfff2182a96,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
 8d3-b858-a023b5c44e9c,DISK]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
 98c-a487-5ce6aaa66f48,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
 e20-a360-e7cdad5dacc3,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
 c8f-8a13-7be37ce769c9,DISK]]

 and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
 task

 if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
 10.11.0.228, because these 2 nodes has all the four blocks needed for
 computation
 but the executor is running in 10.11.0.225

 my theory is not applying anywhere.

 please help me in understanding how spark/yarn calculates number of
 executors/tasks.

 Thanks,
 -Yeshwanth


>>>
>


-- 
Best Regards,
Ayan Guha


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar 
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>>> wrote:
>>>
 Use as a format orc, parquet or avro because they support any
 compression type with parallel processing. Alternatively split your file in
 several smaller ones. Another alternative would be bzip2 (but slower in
 general) or Lzo (usually it is not included by default in many
 distributions).

 On 21 Nov 2016, at 23:17, yeshwanth kumar 
 wrote:

 Hi,

 we are running Hive on Spark, we have an external table over snappy
 compressed csv file of size 917.4 M
 HDFS block size is set to 256 MB

 as per my Understanding, if i run a query over that external table , it
 should launch 4 tasks. one for each block.
 but i am seeing one executor and one task processing all the file.

 trying to understand the reason behind,

 i went one step further to understand the block locality
 when i get the block locations for that file, i found

 [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
 4a8f-be48-b0953fdaad37,DISK],
  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
 4eb8-8183-8d8ff5f24115,DISK],
  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
 43f8-91c9-d8517e68414a,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
 845-b043-8b91ae4017c0,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
 89b-8209-4307f3296211,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
 5fd-ae0f-cc6eb268b0d2,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
 601-8070-f6c5da840e09,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
 94d-87ee-bcfff2182a96,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
 8d3-b858-a023b5c44e9c,DISK]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
 98c-a487-5ce6aaa66f48,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
 e20-a360-e7cdad5dacc3,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
 c8f-8a13-7be37ce769c9,DISK]]

 and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
 task

 if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
 10.11.0.228, because these 2 nodes has all the four blocks needed for
 computation
 but the executor is running in 10.11.0.225

 my theory is not applying anywhere.

 please help me in understanding how spark/yarn calculates number of
 executors/tasks.

 Thanks,
 -Yeshwanth


>>>
>


-- 
Best Regards,
Ayan Guha


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

we have  default rack topology.



-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:

> Because snappy is not splittable, so single task makes sense.
>
> Are sure about rack topology? Ie 225 is in a different rack than 227 or
> 228? What does your topology file says?
> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>
>> Thanks for your reply,
>>
>> i can definitely change the underlying compression format.
>> but i am trying to understand the Locality Level,
>> why executor ran on a different node, where the blocks are not present,
>> when Locality Level is RACK_LOCAL
>>
>> can you shed some light on this.
>>
>>
>> Thanks,
>> Yesh
>>
>>
>> -Yeshwanth
>> Can you Imagine what I would do if I could do all I can - Art of War
>>
>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>> wrote:
>>
>>> Use as a format orc, parquet or avro because they support any
>>> compression type with parallel processing. Alternatively split your file in
>>> several smaller ones. Another alternative would be bzip2 (but slower in
>>> general) or Lzo (usually it is not included by default in many
>>> distributions).
>>>
>>> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>>>
>>> Hi,
>>>
>>> we are running Hive on Spark, we have an external table over snappy
>>> compressed csv file of size 917.4 M
>>> HDFS block size is set to 256 MB
>>>
>>> as per my Understanding, if i run a query over that external table , it
>>> should launch 4 tasks. one for each block.
>>> but i am seeing one executor and one task processing all the file.
>>>
>>> trying to understand the reason behind,
>>>
>>> i went one step further to understand the block locality
>>> when i get the block locations for that file, i found
>>>
>>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>>> 4a8f-be48-b0953fdaad37,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>>> 4eb8-8183-8d8ff5f24115,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>>> 43f8-91c9-d8517e68414a,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>>> 845-b043-8b91ae4017c0,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>>> 89b-8209-4307f3296211,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>>> 601-8070-f6c5da840e09,DISK],
>>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>>> 94d-87ee-bcfff2182a96,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>>> 8d3-b858-a023b5c44e9c,DISK]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>>> 98c-a487-5ce6aaa66f48,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>>> e20-a360-e7cdad5dacc3,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>>> c8f-8a13-7be37ce769c9,DISK]]
>>>
>>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
>>> task
>>>
>>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>>> computation
>>> but the executor is running in 10.11.0.225
>>>
>>> my theory is not applying anywhere.
>>>
>>> please help me in understanding how spark/yarn calculates number of
>>> executors/tasks.
>>>
>>> Thanks,
>>> -Yeshwanth
>>>
>>>
>>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Because snappy is not splittable, so single task makes sense.

Are sure about rack topology? Ie 225 is in a different rack than 227 or
228? What does your topology file says?
On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:

> Thanks for your reply,
>
> i can definitely change the underlying compression format.
> but i am trying to understand the Locality Level,
> why executor ran on a different node, where the blocks are not present,
> when Locality Level is RACK_LOCAL
>
> can you shed some light on this.
>
>
> Thanks,
> Yesh
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke  wrote:
>
>> Use as a format orc, parquet or avro because they support any compression
>> type with parallel processing. Alternatively split your file in several
>> smaller ones. Another alternative would be bzip2 (but slower in general) or
>> Lzo (usually it is not included by default in many distributions).
>>
>> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>>
>> Hi,
>>
>> we are running Hive on Spark, we have an external table over snappy
>> compressed csv file of size 917.4 M
>> HDFS block size is set to 256 MB
>>
>> as per my Understanding, if i run a query over that external table , it
>> should launch 4 tasks. one for each block.
>> but i am seeing one executor and one task processing all the file.
>>
>> trying to understand the reason behind,
>>
>> i went one step further to understand the block locality
>> when i get the block locations for that file, i found
>>
>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>> 4a8f-be48-b0953fdaad37,DISK],
>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>> 4eb8-8183-8d8ff5f24115,DISK],
>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>> 43f8-91c9-d8517e68414a,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>> 845-b043-8b91ae4017c0,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>> 89b-8209-4307f3296211,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>> 601-8070-f6c5da840e09,DISK],
>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>> 94d-87ee-bcfff2182a96,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>> 8d3-b858-a023b5c44e9c,DISK]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>> 98c-a487-5ce6aaa66f48,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>> e20-a360-e7cdad5dacc3,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>> c8f-8a13-7be37ce769c9,DISK]]
>>
>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>>
>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>> computation
>> but the executor is running in 10.11.0.225
>>
>> my theory is not applying anywhere.
>>
>> please help me in understanding how spark/yarn calculates number of
>> executors/tasks.
>>
>> Thanks,
>> -Yeshwanth
>>
>>
>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Thanks for your reply,

i can definitely change the underlying compression format.
but i am trying to understand the Locality Level,
why executor ran on a different node, where the blocks are not present,
when Locality Level is RACK_LOCAL

can you shed some light on this.


Thanks,
Yesh


-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke  wrote:

> Use as a format orc, parquet or avro because they support any compression
> type with parallel processing. Alternatively split your file in several
> smaller ones. Another alternative would be bzip2 (but slower in general) or
> Lzo (usually it is not included by default in many distributions).
>
> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>
> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-
> 48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-
> ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-
> b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-
> 4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-
> 489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-
> 45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-
> 4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-
> 494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-
> 48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-
> 498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-
> 4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-
> 4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>
>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread Jörn Franke
Use as a format orc, parquet or avro because they support any compression type 
with parallel processing. Alternatively split your file in several smaller 
ones. Another alternative would be bzip2 (but slower in general) or Lzo 
(usually it is not included by default in many distributions).

> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
> 
> Hi,
> 
> we are running Hive on Spark, we have an external table over snappy 
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
> 
> as per my Understanding, if i run a query over that external table , it 
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
> 
> trying to understand the reason behind,
> 
> i went one step further to understand the block locality 
> when i get the block locations for that file, i found
> 
> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
>  
>  
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  
>  
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]
>  
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]
> 
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]
> 
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]
> 
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
> 
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or 
> 10.11.0.228, because these 2 nodes has all the four blocks needed for 
> computation
> but the executor is running in 10.11.0.225
> 
> my theory is not applying anywhere.
> 
> please help me in understanding how spark/yarn calculates number of 
> executors/tasks.
> 
> Thanks,
> -Yeshwanth


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread Aniket Bhatnagar
Try changing compression to bzip2 or lzo. For reference -
http://comphadoop.weebly.com

Thanks,
Aniket

On Mon, Nov 21, 2016, 10:18 PM yeshwanth kumar 
wrote:

> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>


RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Hi,

we are running Hive on Spark, we have an external table over snappy
compressed csv file of size 917.4 M
HDFS block size is set to 256 MB

as per my Understanding, if i run a query over that external table , it
should launch 4 tasks. one for each block.
but i am seeing one executor and one task processing all the file.

trying to understand the reason behind,

i went one step further to understand the block locality
when i get the block locations for that file, i found

[DatanodeInfoWithStorage[10.11.0.226:50010
,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010
,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010
,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
DatanodeInfoWithStorage[10.11.0.227:50010
,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]

and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task

if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
10.11.0.228, because these 2 nodes has all the four blocks needed for
computation
but the executor is running in 10.11.0.225

my theory is not applying anywhere.

please help me in understanding how spark/yarn calculates number of
executors/tasks.

Thanks,
-Yeshwanth


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Mich Talebzadeh
gt;> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>>>
>>>> Could you print out the sql execution plan? My guess is about broadcast
>>>> join.
>>>>
>>>>
>>>>
>>>> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com>
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>>> here and is there a way we can optimize the queries in SPARK without the
>>>> obvious hack in Query2.
>>>>
>>>>
>>>> ---
>>>> ENVIRONMENT:
>>>> ---
>>>>
>>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>>> million rows. Both the files are single gzipped csv file.
>>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>>> accessed through SPARK using HiveContext
>>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>>
>>>> --
>>>> QUERY1:
>>>> --
>>>> select A.PK, B.FK
>>>> from A
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>>
>>>>
>>>>
>>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>>
>>>>
>>>> --
>>>> QUERY 2:
>>>> --
>>>>
>>>> select A.PK, B.FK
>>>> from (select PK from A) A
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>>
>>>> This query takes 4.5 mins in SPARK
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Gourav Sengupta
Hi,

We do have a dimension table with around few hundred columns from which we
need only a few columns to join with the main fact table which has a few
million rows. I do not know how one off this case sounds like but  since I
have been working in data warehousing it sounds like a fairly general used
case.

Spark in local mode will be way faster compared to SPARK running on HADOOP.
I have a system with 64 GB RAM and SSD and its performance on local cluster
SPARK is way better.

Did your join include the same number of columns and rows for the dimension
table?


Regards,
Gourav Sengupta

On Thu, Jun 16, 2016 at 9:35 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> sounds like this is a one off case.
>
> Do you have any other use case where you have Hive on MR outperforms Spark?
>
> I did some tests on 1 billion row table getting the selectivity of a
> column using Hive on MR, Hive on Spark engine and Spark running on local
> mode (to keep it simple)
>
>
> Hive 2, Spark 1.6.1
>
> Results:
>
> Hive with map-reduce --> 18  minutes
> Hive on Spark engine -->  6 minutes
> Spark-->  2 minutes
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 16 June 2016 at 08:43, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> I agree here.
>>
>> However it depends always on your use case !
>>
>> Best regards
>>
>> On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>> Hi Mahender,
>>
>> please ensure that for dimension tables you are enabling the broadcast
>> method. You must be able to see surprising gains @12x.
>>
>> Overall I think that SPARK cannot figure out whether to scan all the
>> columns in a table or just the ones which are being used causing this
>> issue.
>>
>> When you start using HIVE with ORC and TEZ  (*) you will see some amazing
>> results, and leaves SPARK way way behind. So pretty much you need to have
>> your data in memory for matching the performance claims of SPARK and the
>> advantage in that case you are getting is not because of SPARK algorithms
>> but just fast I/O from RAM. The advantage of SPARK is that it makes
>> accessible analytics, querying, and streaming frameworks together.
>>
>>
>> In case you are following the optimisations mentioned in the link you
>> hardly have any reasons for using SPARK SQL:
>> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
>> imagine being able to do all of that without having machines which requires
>> huge RAM, or in short you are achieving those performance gains using
>> commodity low cost systems around which HADOOP was designed.
>>
>> I think that Hortonworks is giving a stiff competition here :)
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
>> mahender.bigd...@outlook.com> wrote:
>>
>>> +1,
>>>
>>> Even see performance degradation while comparing SPark SQL with Hive.
>>> We have table of 260 columns. We have executed in hive and SPARK. In
>>> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4
>>> mins of time.
>>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>>
>>> Could you print out the sql execution plan? My guess is about broadcast
>>> join.
>>>
>>>
>>>
>>> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com>
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>> here and is there a way we can optimize the queries in SPARK without the
>>> obvious hack in Query2.
>>>
>>>
>>> ---
>>> ENVIRONMENT:
>>> ---
>>>
>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>> million rows. Both the files are single gzipped csv file.
>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>> accessed through SPARK using HiveContext
>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>
>>> --
>>> QUERY1:
>>> --
>>> select A.PK, B.FK
>>> from A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>>
>>>
>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>
>>>
>>> --
>>> QUERY 2:
>>> --
>>>
>>> select A.PK, B.FK
>>> from (select PK from A) A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>> This query takes 4.5 mins in SPARK
>>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>>
>>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Mich Talebzadeh
sounds like this is a one off case.

Do you have any other use case where you have Hive on MR outperforms Spark?

I did some tests on 1 billion row table getting the selectivity of a column
using Hive on MR, Hive on Spark engine and Spark running on local mode (to
keep it simple)


Hive 2, Spark 1.6.1

Results:

Hive with map-reduce --> 18  minutes
Hive on Spark engine -->  6 minutes
Spark-->  2 minutes


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 08:43, Jörn Franke <jornfra...@gmail.com> wrote:

> I agree here.
>
> However it depends always on your use case !
>
> Best regards
>
> On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi Mahender,
>
> please ensure that for dimension tables you are enabling the broadcast
> method. You must be able to see surprising gains @12x.
>
> Overall I think that SPARK cannot figure out whether to scan all the
> columns in a table or just the ones which are being used causing this
> issue.
>
> When you start using HIVE with ORC and TEZ  (*) you will see some amazing
> results, and leaves SPARK way way behind. So pretty much you need to have
> your data in memory for matching the performance claims of SPARK and the
> advantage in that case you are getting is not because of SPARK algorithms
> but just fast I/O from RAM. The advantage of SPARK is that it makes
> accessible analytics, querying, and streaming frameworks together.
>
>
> In case you are following the optimisations mentioned in the link you
> hardly have any reasons for using SPARK SQL:
> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
> imagine being able to do all of that without having machines which requires
> huge RAM, or in short you are achieving those performance gains using
> commodity low cost systems around which HADOOP was designed.
>
> I think that Hortonworks is giving a stiff competition here :)
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
> mahender.bigd...@outlook.com> wrote:
>
>> +1,
>>
>> Even see performance degradation while comparing SPark SQL with Hive.
>> We have table of 260 columns. We have executed in hive and SPARK. In
>> Hive, it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4
>> mins of time.
>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>
>> Could you print out the sql execution plan? My guess is about broadcast
>> join.
>>
>>
>>
>> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com>
>> gourav.sengu...@gmail.com> wrote:
>>
>> Hi,
>>
>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
>> and is there a way we can optimize the queries in SPARK without the obvious
>> hack in Query2.
>>
>>
>> ---
>> ENVIRONMENT:
>> ---
>>
>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>> million rows. Both the files are single gzipped csv file.
>> > Both table A and B are external tables in AWS S3 and created in HIVE
>> accessed through SPARK using HiveContext
>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>> allowMaximumResource allocation and node types are c3.4xlarge).
>>
>> --
>> QUERY1:
>> --
>> select A.PK, B.FK
>> from A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>>
>>
>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>
>>
>> --
>> QUERY 2:
>> --
>>
>> select A.PK, B.FK
>> from (select PK from A) A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>> This query takes 4.5 mins in SPARK
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-16 Thread Jörn Franke
I agree here.

However it depends always on your use case ! 

Best regards

> On 16 Jun 2016, at 04:58, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> 
> Hi Mahender, 
> 
> please ensure that for dimension tables you are enabling the broadcast 
> method. You must be able to see surprising gains @12x. 
> 
> Overall I think that SPARK cannot figure out whether to scan all the columns 
> in a table or just the ones which are being used causing this issue. 
> 
> When you start using HIVE with ORC and TEZ  (*) you will see some amazing 
> results, and leaves SPARK way way behind. So pretty much you need to have 
> your data in memory for matching the performance claims of SPARK and the 
> advantage in that case you are getting is not because of SPARK algorithms but 
> just fast I/O from RAM. The advantage of SPARK is that it makes accessible 
> analytics, querying, and streaming frameworks together.
> 
> 
> In case you are following the optimisations mentioned in the link you hardly 
> have any reasons for using SPARK SQL: 
> http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And 
> imagine being able to do all of that without having machines which requires 
> huge RAM, or in short you are achieving those performance gains using 
> commodity low cost systems around which HADOOP was designed. 
> 
> I think that Hortonworks is giving a stiff competition here :)
> 
> Regards,
> Gourav Sengupta
> 
>> On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam 
>> <mahender.bigd...@outlook.com> wrote:
>> +1,
>> 
>> Even see performance degradation while comparing SPark SQL with Hive. 
>> We have table of 260 columns. We have executed in hive and SPARK. In Hive, 
>> it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins 
>> of time. 
>>> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>>> Could you print out the sql execution plan? My guess is about broadcast 
>>> join. 
>>> 
>>> 
>>> 
>>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here 
>>>> and is there a way we can optimize the queries in SPARK without the 
>>>> obvious hack in Query2.
>>>> 
>>>> 
>>>> ---
>>>> ENVIRONMENT:
>>>> ---
>>>> 
>>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 
>>>> > million rows. Both the files are single gzipped csv file.
>>>> > Both table A and B are external tables in AWS S3 and created in HIVE 
>>>> > accessed through SPARK using HiveContext
>>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using 
>>>> > allowMaximumResource allocation and node types are c3.4xlarge).
>>>> 
>>>> --
>>>> QUERY1: 
>>>> --
>>>> select A.PK, B.FK
>>>> from A 
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>> 
>>>> 
>>>> 
>>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK 
>>>> 
>>>> 
>>>> --
>>>> QUERY 2:
>>>> --
>>>> 
>>>> select A.PK, B.FK
>>>> from (select PK from A) A 
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>> 
>>>> This query takes 4.5 mins in SPARK 
>>>> 
>>>> 
>>>> 
>>>> Regards,
>>>> Gourav Sengupta
> 


Re: HIVE Query 25x faster than SPARK Query

2016-06-15 Thread Gourav Sengupta
Hi Mahender,

please ensure that for dimension tables you are enabling the broadcast
method. You must be able to see surprising gains @12x.

Overall I think that SPARK cannot figure out whether to scan all the
columns in a table or just the ones which are being used causing this
issue.

When you start using HIVE with ORC and TEZ  (*) you will see some amazing
results, and leaves SPARK way way behind. So pretty much you need to have
your data in memory for matching the performance claims of SPARK and the
advantage in that case you are getting is not because of SPARK algorithms
but just fast I/O from RAM. The advantage of SPARK is that it makes
accessible analytics, querying, and streaming frameworks together.


In case you are following the optimisations mentioned in the link you
hardly have any reasons for using SPARK SQL:
http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/ . And
imagine being able to do all of that without having machines which requires
huge RAM, or in short you are achieving those performance gains using
commodity low cost systems around which HADOOP was designed.

I think that Hortonworks is giving a stiff competition here :)

Regards,
Gourav Sengupta

On Wed, Jun 15, 2016 at 11:35 PM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> +1,
>
> Even see performance degradation while comparing SPark SQL with Hive.
> We have table of 260 columns. We have executed in hive and SPARK. In Hive,
> it is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins
> of time.
> On 6/9/2016 3:19 PM, Gavin Yue wrote:
>
> Could you print out the sql execution plan? My guess is about broadcast
> join.
>
>
>
> On Jun 9, 2016, at 07:14, Gourav Sengupta < <gourav.sengu...@gmail.com>
> gourav.sengu...@gmail.com> wrote:
>
> Hi,
>
> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
> and is there a way we can optimize the queries in SPARK without the obvious
> hack in Query2.
>
>
> ---
> ENVIRONMENT:
> ---
>
> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
> million rows. Both the files are single gzipped csv file.
> > Both table A and B are external tables in AWS S3 and created in HIVE
> accessed through SPARK using HiveContext
> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
> allowMaximumResource allocation and node types are c3.4xlarge).
>
> --
> QUERY1:
> --
> select A.PK, B.FK
> from A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
>
>
> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>
>
> --
> QUERY 2:
> --
>
> select A.PK, B.FK
> from (select PK from A) A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
> This query takes 4.5 mins in SPARK
>
>
>
> Regards,
> Gourav Sengupta
>
>
>
>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-15 Thread Mahender Sarangam
+1,

Even see performance degradation while comparing SPark SQL with Hive.
We have table of 260 columns. We have executed in hive and SPARK. In Hive, it 
is taking 66 sec for 1 gb of data whereas in Spark, it is taking 4 mins of time.

On 6/9/2016 3:19 PM, Gavin Yue wrote:
Could you print out the sql execution plan? My guess is about broadcast join.



On Jun 9, 2016, at 07:14, Gourav Sengupta 
<<mailto:gourav.sengu...@gmail.com>gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>>
 wrote:

Hi,

Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and 
is there a way we can optimize the queries in SPARK without the obvious hack in 
Query2.


---
ENVIRONMENT:
---

> Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million 
> rows. Both the files are single gzipped csv file.
> Both table A and B are external tables in AWS S3 and created in HIVE accessed 
> through SPARK using HiveContext
> EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using 
> allowMaximumResource allocation and node types are c3.4xlarge).

--
QUERY1:
--
select A.PK<http://A.PK>, B.FK<http://B.FK>
from A
left outer join B on (A.PK<http://A.PK> = B.FK<http://B.FK>)
where B.FK<http://B.FK> is not null;



This query takes 4 mins in HIVE and 1.1 hours in SPARK


--
QUERY 2:
--

select A.PK<http://A.PK>, B.FK<http://B.FK>
from (select PK from A) A
left outer join B on (A.PK<http://A.PK> = B.FK<http://B.FK>)
where B.FK<http://B.FK> is not null;

This query takes 4.5 mins in SPARK



Regards,
Gourav Sengupta






Re: HIVE Query 25x faster than SPARK Query

2016-06-10 Thread Gourav Sengupta
Hi Gavin,

for the first time someone is responding to this thread with a meaningful
conversation - thanks for that.

Okay, I did not tweak the spark.sql.autoBroadcastJoinThreshold parameter
and since the cached field was around 75 MB therefore I do not think that
broadcast join was used.

But I will surely be excited to see if I am going wrong here and post the
results of sql.describe(). Thanks a ton once again.


Hi Ted,

Is there anyway you can throw some light on this before I post this in a
blog?


Regards,
Gourav Sengupta


On Fri, Jun 10, 2016 at 7:22 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Yes.  because in the second query, you did a  (select PK from A) A .  I
>  guess it could the the subquery makes the results much smaller and make
> the broadcastJoin, so it is much faster.
>
> you could use sql.describe() to check the execution plan.
>
>
> On Fri, Jun 10, 2016 at 1:41 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> I think if we try to see why is Query 2 faster than Query 1 then all the
>> answers will be given without beating around the bush. That is the right
>> way to find out what is happening and why.
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com>
>> wrote:
>>
>>> Could you print out the sql execution plan? My guess is about broadcast
>>> join.
>>>
>>>
>>>
>>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>> here and is there a way we can optimize the queries in SPARK without the
>>> obvious hack in Query2.
>>>
>>>
>>> ---
>>> ENVIRONMENT:
>>> ---
>>>
>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>> million rows. Both the files are single gzipped csv file.
>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>> accessed through SPARK using HiveContext
>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>
>>> --
>>> QUERY1:
>>> --
>>> select A.PK, B.FK
>>> from A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>>
>>>
>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>
>>>
>>> --
>>> QUERY 2:
>>> --
>>>
>>> select A.PK, B.FK
>>> from (select PK from A) A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>> This query takes 4.5 mins in SPARK
>>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-10 Thread Gavin Yue
Yes.  because in the second query, you did a  (select PK from A) A .  I
 guess it could the the subquery makes the results much smaller and make
the broadcastJoin, so it is much faster.

you could use sql.describe() to check the execution plan.


On Fri, Jun 10, 2016 at 1:41 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> I think if we try to see why is Query 2 faster than Query 1 then all the
> answers will be given without beating around the bush. That is the right
> way to find out what is happening and why.
>
>
> Regards,
> Gourav
>
> On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> Could you print out the sql execution plan? My guess is about broadcast
>> join.
>>
>>
>>
>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
>> and is there a way we can optimize the queries in SPARK without the obvious
>> hack in Query2.
>>
>>
>> ---
>> ENVIRONMENT:
>> ---
>>
>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>> million rows. Both the files are single gzipped csv file.
>> > Both table A and B are external tables in AWS S3 and created in HIVE
>> accessed through SPARK using HiveContext
>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>> allowMaximumResource allocation and node types are c3.4xlarge).
>>
>> --
>> QUERY1:
>> --
>> select A.PK, B.FK
>> from A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>>
>>
>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>
>>
>> --
>> QUERY 2:
>> --
>>
>> select A.PK, B.FK
>> from (select PK from A) A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>> This query takes 4.5 mins in SPARK
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-10 Thread Gourav Sengupta
Hi,

I think if we try to see why is Query 2 faster than Query 1 then all the
answers will be given without beating around the bush. That is the right
way to find out what is happening and why.


Regards,
Gourav

On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Could you print out the sql execution plan? My guess is about broadcast
> join.
>
>
>
> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi,
>
> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
> and is there a way we can optimize the queries in SPARK without the obvious
> hack in Query2.
>
>
> ---
> ENVIRONMENT:
> ---
>
> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
> million rows. Both the files are single gzipped csv file.
> > Both table A and B are external tables in AWS S3 and created in HIVE
> accessed through SPARK using HiveContext
> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
> allowMaximumResource allocation and node types are c3.4xlarge).
>
> --
> QUERY1:
> --
> select A.PK, B.FK
> from A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
>
>
> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>
>
> --
> QUERY 2:
> --
>
> select A.PK, B.FK
> from (select PK from A) A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
> This query takes 4.5 mins in SPARK
>
>
>
> Regards,
> Gourav Sengupta
>
>
>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Gavin Yue
Could you print out the sql execution plan? My guess is about broadcast join. 



> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> 
> Hi,
> 
> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and 
> is there a way we can optimize the queries in SPARK without the obvious hack 
> in Query2.
> 
> 
> ---
> ENVIRONMENT:
> ---
> 
> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million 
> > rows. Both the files are single gzipped csv file.
> > Both table A and B are external tables in AWS S3 and created in HIVE 
> > accessed through SPARK using HiveContext
> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using 
> > allowMaximumResource allocation and node types are c3.4xlarge).
> 
> --
> QUERY1: 
> --
> select A.PK, B.FK
> from A 
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
> 
> 
> 
> This query takes 4 mins in HIVE and 1.1 hours in SPARK 
> 
> 
> --
> QUERY 2:
> --
> 
> select A.PK, B.FK
> from (select PK from A) A 
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
> 
> This query takes 4.5 mins in SPARK 
> 
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Mich Talebzadeh
I still fail to see how Hive can do orders of magnitude faster compared to
Spark.

Assuming that Hive is using map-reduce, I cannot see a real case for Hive
to do faster than at least under normal operations

Don't take me wrong. I am a fan of Hive. The performance of Hive comes from
deploying the execution engine (mr, spark, tez) to do the execution of the
work.

If we leave that aside for now the other influencing factor would be Hive
Optimizer compared to Spark Optimizer.

If I go back to thread owner point and quote:

"Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
and is there a way we can optimize the queries in SPARK without the obvious
hack in Query2.

Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million
rows. Both the files are single gzipped csv file.
> Both table A and B are external tables in AWS S3 and created in HIVE
accessed through SPARK using HiveContext
> EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
allowMaximumResource allocation and node types are c3.4xlarge).

To take it further and make some reasonable deduction:

With gzipped files:

   1. Hive will not be able to split the csv files into chunks/blocks and
   run multiple maps in parallel
   2.  Spark will give you an RDD with only 1 partition (as of 0.9.0). This
   is because gzipped files are not splittable If you do not repartition the
   RDD, any operations on that RDD will be limited to a single core.

So with zipped files both Hive and Spark have issues. both tables have not
a very large number of rows. With Spark were temporary tables deployed that
IMO does help performance. It is possible that Spark has been spilling to
disk.

We really need the output from GUI jobs, stages and spillage like below to
deduce if there was indeed spillage to disk by Spark see (TungstenAggregate)


​


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 9 June 2016 at 21:40, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:

> Hi Stephen,
>
>
> How can a single gzipped CSV file be partitioned and who partitions tables
> based on Primary Key in Hive?  If you read the environments section you
> will be able to see that all the required details are mentioned.
>
> As far as I understand that Hive does work 25x faster (in these particular
> cases) and around 100x faster (when we are using TEZ) when compared to
> SPARK.
>
> It will be interesting to see if Ted includes these findings while they
> are benchmarking SPARK. This is a very typical and a general used case.
>
>
> Regards,
> Gourav
>
> On Thu, Jun 9, 2016 at 5:11 PM, Stephen Boesch <java...@gmail.com> wrote:
>
>> ooc are the tables partitioned on a.pk and b.fk?  Hive might be using
>> copartitioning in that case: it is one of hive's strengths.
>>
>> 2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>:
>>
>>> Hi Mich,
>>>
>>> does not Hive use map-reduce? I thought it to be so. And since I am
>>> running the queries in EMR 4.6 therefore HIVE is not using TEZ.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> are you using map-reduce with Hive?
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>>>> here and is there a way we can optimize the queries in SPARK without the
>>>>> obvious hack in Query2.
>>>>>
>>>>>
>>>>> ---
>>>>> ENVIRONMENT:
>>>>> ---
>>>>>
>>>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>>>> million rows. Both the files are single gzipped csv file.
>>>>> > Both table A and B are external tables in AWS S3 and cre

Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Gourav Sengupta
Hi Stephen,


How can a single gzipped CSV file be partitioned and who partitions tables
based on Primary Key in Hive?  If you read the environments section you
will be able to see that all the required details are mentioned.

As far as I understand that Hive does work 25x faster (in these particular
cases) and around 100x faster (when we are using TEZ) when compared to
SPARK.

It will be interesting to see if Ted includes these findings while they are
benchmarking SPARK. This is a very typical and a general used case.


Regards,
Gourav

On Thu, Jun 9, 2016 at 5:11 PM, Stephen Boesch <java...@gmail.com> wrote:

> ooc are the tables partitioned on a.pk and b.fk?  Hive might be using
> copartitioning in that case: it is one of hive's strengths.
>
> 2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>:
>
>> Hi Mich,
>>
>> does not Hive use map-reduce? I thought it to be so. And since I am
>> running the queries in EMR 4.6 therefore HIVE is not using TEZ.
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> are you using map-reduce with Hive?
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>>> here and is there a way we can optimize the queries in SPARK without the
>>>> obvious hack in Query2.
>>>>
>>>>
>>>> ---
>>>> ENVIRONMENT:
>>>> ---
>>>>
>>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>>> million rows. Both the files are single gzipped csv file.
>>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>>> accessed through SPARK using HiveContext
>>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>>
>>>> --
>>>> QUERY1:
>>>> --
>>>> select A.PK, B.FK
>>>> from A
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>>
>>>>
>>>>
>>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>>
>>>>
>>>> --
>>>> QUERY 2:
>>>> --
>>>>
>>>> select A.PK, B.FK
>>>> from (select PK from A) A
>>>> left outer join B on (A.PK = B.FK)
>>>> where B.FK is not null;
>>>>
>>>> This query takes 4.5 mins in SPARK
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Stephen Boesch
ooc are the tables partitioned on a.pk and b.fk?  Hive might be using
copartitioning in that case: it is one of hive's strengths.

2016-06-09 7:28 GMT-07:00 Gourav Sengupta <gourav.sengu...@gmail.com>:

> Hi Mich,
>
> does not Hive use map-reduce? I thought it to be so. And since I am
> running the queries in EMR 4.6 therefore HIVE is not using TEZ.
>
>
> Regards,
> Gourav
>
> On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> are you using map-reduce with Hive?
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening
>>> here and is there a way we can optimize the queries in SPARK without the
>>> obvious hack in Query2.
>>>
>>>
>>> ---
>>> ENVIRONMENT:
>>> ---
>>>
>>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>>> million rows. Both the files are single gzipped csv file.
>>> > Both table A and B are external tables in AWS S3 and created in HIVE
>>> accessed through SPARK using HiveContext
>>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>>> allowMaximumResource allocation and node types are c3.4xlarge).
>>>
>>> --
>>> QUERY1:
>>> --
>>> select A.PK, B.FK
>>> from A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>>
>>>
>>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>>
>>>
>>> --
>>> QUERY 2:
>>> --
>>>
>>> select A.PK, B.FK
>>> from (select PK from A) A
>>> left outer join B on (A.PK = B.FK)
>>> where B.FK is not null;
>>>
>>> This query takes 4.5 mins in SPARK
>>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Gourav Sengupta
Hi Mich,

does not Hive use map-reduce? I thought it to be so. And since I am running
the queries in EMR 4.6 therefore HIVE is not using TEZ.


Regards,
Gourav

On Thu, Jun 9, 2016 at 3:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> are you using map-reduce with Hive?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
>> and is there a way we can optimize the queries in SPARK without the obvious
>> hack in Query2.
>>
>>
>> ---
>> ENVIRONMENT:
>> ---
>>
>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>> million rows. Both the files are single gzipped csv file.
>> > Both table A and B are external tables in AWS S3 and created in HIVE
>> accessed through SPARK using HiveContext
>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>> allowMaximumResource allocation and node types are c3.4xlarge).
>>
>> ------
>> QUERY1:
>> --
>> select A.PK, B.FK
>> from A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>>
>>
>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>
>>
>> --
>> QUERY 2:
>> --
>>
>> select A.PK, B.FK
>> from (select PK from A) A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>> This query takes 4.5 mins in SPARK
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Mich Talebzadeh
are you using map-reduce with Hive?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 9 June 2016 at 15:14, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:

> Hi,
>
> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
> and is there a way we can optimize the queries in SPARK without the obvious
> hack in Query2.
>
>
> ---
> ENVIRONMENT:
> ---
>
> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
> million rows. Both the files are single gzipped csv file.
> > Both table A and B are external tables in AWS S3 and created in HIVE
> accessed through SPARK using HiveContext
> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
> allowMaximumResource allocation and node types are c3.4xlarge).
>
> --
> QUERY1:
> --
> select A.PK, B.FK
> from A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
>
>
> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>
>
> --
> QUERY 2:
> --
>
> select A.PK, B.FK
> from (select PK from A) A
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
>
> This query takes 4.5 mins in SPARK
>
>
>
> Regards,
> Gourav Sengupta
>
>
>
>


HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Gourav Sengupta
Hi,

Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
and is there a way we can optimize the queries in SPARK without the obvious
hack in Query2.


---
ENVIRONMENT:
---

> Table A 533 columns x 24 million rows and Table B has 2 columns x 3
million rows. Both the files are single gzipped csv file.
> Both table A and B are external tables in AWS S3 and created in HIVE
accessed through SPARK using HiveContext
> EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
allowMaximumResource allocation and node types are c3.4xlarge).

--
QUERY1:
--
select A.PK, B.FK
from A
left outer join B on (A.PK = B.FK)
where B.FK is not null;



This query takes 4 mins in HIVE and 1.1 hours in SPARK


------
QUERY 2:
--

select A.PK, B.FK
from (select PK from A) A
left outer join B on (A.PK = B.FK)
where B.FK is not null;

This query takes 4.5 mins in SPARK



Regards,
Gourav Sengupta


Re: hive on spark query error

2015-09-25 Thread Marcelo Vanzin
Seems like you have "hive.server2.enable.doAs" enabled; you can either
disable it, or configure hs2 so that the user running the service
("hadoop" in your case) can impersonate others.

See:
https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-common/Superusers.html

On Fri, Sep 25, 2015 at 10:33 AM, Garry Chen  wrote:
> 2015-09-25 13:31:16,245 INFO  [stderr-redir-1]: client.SparkClientImpl 
> (SparkClientImpl.java:run(569)) - ERROR: 
> org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is 
> not allowed to impersonate HIVEAPP

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



hive on spark query error

2015-09-25 Thread Garry Chen
Hi All,
I am following 
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started?
 to setup hive on spark.  After setup/configuration everything startup I am 
able to show tables but when executing sql statement within beeline I got 
error.  Please help and thank you very much.

Cluster Environment (3 nodes) as following
hadoop-2.7.1
spark-1.4.1-bin-hadoop2.6
zookeeper-3.4.6
apache-hive-1.2.1-bin

Error from hive log:
2015-09-25 11:51:03,123 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of oracle
2015-09-25 11:51:03,133 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--proxy-user oracle --properties-file 
/tmp/spark-submit.840692098393819749.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 40476 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 11:51:03,867 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, 
mesos, or local
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose 
for debug output
2015-09-25 11:51:03,885 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown 
hook called
2015-09-25 11:51:03,889 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.




RE: hive on spark query error

2015-09-25 Thread Garry Chen
Yes you are right.  Make the change and also link hive-site.xml into spark conf 
directory.  Rerun the sql getting error in hive.log

2015-09-25 13:31:14,750 INFO  [HiveServer2-Handler-Pool: Thread-125]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of HIVEAPP
2015-09-25 13:31:14,750 INFO  [HiveServer2-Handler-Pool: Thread-125]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--executor-memory 512m --proxy-user HIVEAPP --properties-file 
/tmp/spark-submit.4348738410387344124.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 48481 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 13:31:15,473 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 13:31:15,473 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 13:31:15,718 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:15 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... using builtin-java 
classes where applicable
2015-09-25 13:31:16,063 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO client.RMProxy: 
Connecting to ResourceManager at /0.0.0.0:8032
2015-09-25 13:31:16,245 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - ERROR: 
org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is 
not allowed to impersonate HIVEAPP
2015-09-25 13:31:16,248 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO util.Utils: Shutdown 
hook called
2015-09-25 13:31:16,265 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Friday, September 25, 2015 1:12 PM
To: Garry Chen <g...@cornell.edu>
Cc: Jimmy Xiang <jxi...@cloudera.com>; user@spark.apache.org
Subject: Re: hive on spark query error

On Fri, Sep 25, 2015 at 10:05 AM, Garry Chen <g...@cornell.edu> wrote:
> In spark-defaults.conf the spark.master  is  spark://hostname:7077.  
> From hive-site.xml  
> spark.master
> hostname
>   

That's not a valid value for spark.master (as the error indicates).
You should set it to "spark://hostname:7077", as you have it in 
spark-defaults.conf (or perhaps remove the setting from hive-site.xml, I think 
hive will honor your spark-defaults.conf).

--
Marcelo


RE: hive on spark query error

2015-09-25 Thread Garry Chen
In spark-defaults.conf the spark.master  is  spark://hostname:7077.  From 
hive-site.xml  
spark.master
hostname
  



From: Jimmy Xiang [mailto:jxi...@cloudera.com]
Sent: Friday, September 25, 2015 1:00 PM
To: Garry Chen <g...@cornell.edu>
Cc: user@spark.apache.org
Subject: Re: hive on spark query error

> Error: Master must start with yarn, spark, mesos, or local
What's your setting for spark.master?

On Fri, Sep 25, 2015 at 9:56 AM, Garry Chen 
<g...@cornell.edu<mailto:g...@cornell.edu>> wrote:
Hi All,
I am following 
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started?
 to setup hive on spark.  After setup/configuration everything startup I am 
able to show tables but when executing sql statement within beeline I got 
error.  Please help and thank you very much.

Cluster Environment (3 nodes) as following
hadoop-2.7.1
spark-1.4.1-bin-hadoop2.6
zookeeper-3.4.6
apache-hive-1.2.1-bin

Error from hive log:
2015-09-25 11:51:03,123 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of oracle
2015-09-25 11:51:03,133 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--proxy-user oracle --properties-file 
/tmp/spark-submit.840692098393819749.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 40476 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 11:51:03,867 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, 
mesos, or local
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose 
for debug output
2015-09-25 11:51:03,885 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown 
hook called
2015-09-25 11:51:03,889 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.





Re: hive on spark query error

2015-09-25 Thread Marcelo Vanzin
On Fri, Sep 25, 2015 at 10:05 AM, Garry Chen  wrote:
> In spark-defaults.conf the spark.master  is  spark://hostname:7077.  From
> hive-site.xml  
> spark.master
> hostname
>   

That's not a valid value for spark.master (as the error indicates).
You should set it to "spark://hostname:7077", as you have it in
spark-defaults.conf (or perhaps remove the setting from hive-site.xml,
I think hive will honor your spark-defaults.conf).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark query

2015-07-08 Thread Harish Butani
try the spark-datetime package:
https://github.com/SparklineData/spark-datetime
Follow this example
https://github.com/SparklineData/spark-datetime#a-basic-example to get the
different attributes of a DateTime.

On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote:

 As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs,
 please take a look below builtin UDFs of Hive, get day of year should be as
 simply as existing RDBMS

 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions


 At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote:

 Hi everyone,

 I can't get 'day of year'  when using spark query. Can you help any way to
 achieve day of year?

 Regards,
 Ravi






Spark query

2015-07-08 Thread Ravisankar Mani
Hi everyone,

I can't get 'day of year'  when using spark query. Can you help any way to
achieve day of year?

Regards,
Ravi


Re: Spark query

2015-07-08 Thread Brandon White
Convert the column to a column of java Timestamps. Then you can do the
following

import java.sql.Timestamp
import java.util.Calendar
def date_trunc(timestamp:Timestamp, timeField:String) = {
  timeField match {
case hour =
  val cal = Calendar.getInstance()
  cal.setTimeInMillis(timestamp.getTime())
  cal.get(Calendar.HOUR_OF_DAY)

case day =
  val cal = Calendar.getInstance()
  cal.setTimeInMillis(timestamp.getTime())
  cal.get(Calendar.DAY)
  }
}

sqlContext.udf.register(date_trunc, date_trunc _)

On Wed, Jul 8, 2015 at 9:23 PM, Harish Butani rhbutani.sp...@gmail.com
wrote:

 try the spark-datetime package:
 https://github.com/SparklineData/spark-datetime
 Follow this example
 https://github.com/SparklineData/spark-datetime#a-basic-example to get
 the different attributes of a DateTime.

 On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote:

 As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs,
 please take a look below builtin UDFs of Hive, get day of year should be as
 simply as existing RDBMS

 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions


 At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote:

 Hi everyone,

 I can't get 'day of year'  when using spark query. Can you help any way
 to achieve day of year?

 Regards,
 Ravi