Re: Spark SQL table Join, one task is taking long

2014-12-04 Thread Veeranagouda Mukkanagoudar
Have you tried joins on regular RDD instead of schemaRDD? We have found
that its 10 times faster than joins between schemaRDDs.

val largeRDD = ...
val smallRDD = ...

largeRDD.join(smallRDD)  // otherway JOIN would run for long.

Only limitation i see with that implementation is regular RDD supports only
22 fields unless you use nested the case class.



On Thu, Dec 4, 2014 at 12:57 PM, Venkat Subramanian 
wrote:

> Hi Cheng,
>
> Thank you very much for taking your time and providing a detailed
> explanation.
> I tried a few things you suggested and some more things.
>
> The ContactDetail table (8 GB) is the fact table and DAgents is the Dim
> table (<500 KB), reverse of what you are assuming, but your ideas still
> apply.
>
> I tried the following:
>
> a) Cached the smaller Dim table to memory.
>  sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "1000")
>  sqlContext.cacheTable("DAgents")
>
> UI -> Stage -> Storage shows it to be cached in RDD when I run it.
>
> val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
> ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")
>
> CDJoinQry.map(ta => ta(4)).count
>
> I see no difference in terms of performance. It takes the same amount of
> time for the query ~1.2 min
>
> b)  I reversed both the order of tables and where clause in the query
>
> val CDJoinQry= sqlContext.sql("SELECT  * FROM DAgents, ContactDetail  WHERE
> DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6")
>
> The performance went  bad. It took 6-7 min to complete.
>
> Just changing the order of table in Select for this join, keeping the same
> where clause order, perf was similar (1.2-1.4 min).
>
> c)  Using query in a), I tried to keep the storage in columnar fashion with
> sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
>
> I see no difference in terms of performance. It takes the same amount of
> time for the query ~1.2 min.
> Not sure if it even works.
>
> d) I tried changing the comma separated HDFS files to Parquet format in
> HDFS
> and reading it as parquet and then running query on it.
>
> DAgents.saveAsParquetFile("DAgents.parquet")
> FCDRDD.saveAsParquetFile("ContactDetail.parquet")
>
>
> val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet")
> DAgentsParquetRDD.registerAsTable("DAgentsParquet")
>
> val FContactDetailParquetRDD =
> sqlContext.parquetFile("ContactDetail.parquet")
> FContactDetailParquetRDD.registerAsTable("ContactDetailParquet")
>
> val CDJoinQryParquet= sqlContext.sql("SELECT  * FROM ContactDetailParquet,
> DAgentsParquet  WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and
> DAgentsParquet.f1 = 902")
> CDJoinQryParquet.map(ta => ta(4)).count
>
> *The query time is actually more for this join query.* It ended up taking
> 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse
> than non parquet for this join.
>
> I changed the query where table order and where clause was reversed and ran
> it for parquet
>
> val CDJoinQryParquetReversed= sqlContext.sql("SELECT  * FROM
> DAgentsParquet,
> ContactDetailParquet  WHERE   DAgentsParquet.f1 = 902 and
> DAgentsParquet.f1=ContactDetailParquet.f6 ")
> CDJoinQryParquetReversed.map(ta => ta(4)).count
>
> it took > 18 min and had to kill it as it kept on running.
>
> *But queries where there is no join, Parquet's performance was extremely
> good.*
> For example, this query below where there is no join, ran in 8 seconds,
> whereas the same query in non parquet  took 30 seconds.
> val CDJoinQryParquet0= sqlContext.sql("SELECT  * FROM ContactDetailParquet
> WHERE ContactDetailParquet.f6 = 902")
> CDJoinQryParquet0.map(ta => ta(4)).count
>
> *Some potential conclusions (pl. comment) :*
> * Order in where clause seems to matter in Spark SQL optimizer. In
> relational DBs  that I have worked with, when I noticed, order of where
> clause is typically a hint . Would be nice of Spark SQL optimizer is fixed
> to ignore order of clauses and optimize it automatically.
> * I tried changing just the table order  in Select statement for a join and
> it also seems to matter when reading data from HDFS (for parquet and to a
> less extent for non parquet in my case) even when the where clause order is
> same. Would be nice of SQL optimizer  optimizes it automatically.
> * Table joins for huge table(s) are costly. Fact and Dimension concepts
> from
> star schema don't translate well to Big Data (Hadoop, Spark). It may be
> better to de-normalize and store huge tables to avoid Joins. Joins seem to
> be evil. (Have tried de-normalizing when using Cassandra, but that has its
> own problem of resulting in full table scan when running ad-hoc queries
> when
> the keys are not known)
>
> Regards,
>
> Venkat
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -

Re: spark RDD join Error

2014-09-04 Thread Veeranagouda Mukkanagoudar
Thanks a lot, that fixed the issue :)


On Thu, Sep 4, 2014 at 4:51 PM, Zhan Zhang  wrote:

> Try this:
> Import org.apache.spark.SparkContext._
>
> Thanks.
>
> Zhan Zhang
>
>
> On Sep 4, 2014, at 4:36 PM, Veeranagouda Mukkanagoudar 
> wrote:
>
> I am planning to use RDD join operation, to test out i was trying to
> compile some test code, but am getting following compilation error
>
> *value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]*
> *[error] rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }*
>
> Code:
>
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.rdd.RDD
>
> def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) :
> RDD[(String, Int)] = {
> rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }
> }
>
> Any help would be great .
>
> Veera
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.


spark RDD join Error

2014-09-04 Thread Veeranagouda Mukkanagoudar
I am planning to use RDD join operation, to test out i was trying to
compile some test code, but am getting following compilation error

*value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]*
*[error] rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }*

Code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) :
RDD[(String, Int)] = {
rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }
}

Any help would be great .

Veera


Re: confirm subscribe to user@spark.apache.org

2014-07-11 Thread Veeranagouda Mukkanagoudar
On Fri, Jul 11, 2014 at 3:11 PM,  wrote:

> Hi! This is the ezmlm program. I'm managing the
> user@spark.apache.org mailing list.
>
> To confirm that you would like
>
>veera...@gmail.com
>
> added to the user mailing list, please send
> a short reply to this address:
>
>user-sc.1405116686.kijenhjamnjaodhflpgc-veeran54=
> gmail@spark.apache.org
>
> Usually, this happens when you just hit the "reply" button.
> If this does not work, simply copy the address and paste it into
> the "To:" field of a new message.
>
> This confirmation serves two purposes. First, it verifies that I am able
> to get mail through to you. Second, it protects you in case someone
> forges a subscription request in your name.
>
> Please note that ALL Apache dev- and user- mailing lists are publicly
> archived.  Do familiarize yourself with Apache's public archive policy at
>
> http://www.apache.org/foundation/public-archives.html
>
> prior to subscribing and posting messages to user@spark.apache.org.
> If you're not sure whether or not the policy applies to this mailing list,
> assume it does unless the list name contains the word "private" in it.
>
> Some mail programs are broken and cannot handle long addresses. If you
> cannot reply to this request, instead send a message to
>  and put the
> entire address listed above into the "Subject:" line.
>
>
> --- Administrative commands for the user list ---
>
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
>
> To subscribe to the list, send a message to:
>
>
> To remove your address from the list, send a message to:
>
>
> Send mail to the following for info and FAQ for this list:
>
>
>
> Similar addresses exist for the digest list:
>
>
>
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
>
> To get an index with subject and author for messages 123-456 , mail:
>
>
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
>
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
>
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
>
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
>
> To stop subscription for this address, mail:
> 
>
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
>
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> user-ow...@spark.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
>
> --- Enclosed is a copy of the request I received.
>
> Return-Path: 
> Received: (qmail 70277 invoked by uid 99); 11 Jul 2014 22:11:26 -
> Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
> by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:26
> +
> X-ASF-Spam-Status: No, hits=-0.3 required=10.0
>
> tests=ASF_LIST_OPS,FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS
> X-Spam-Check-By: apache.org
> Received-SPF: pass (athena.apache.org: domain of veera...@gmail.com
> designates 209.85.212.173 as permitted sender)
> Received: from [209.85.212.173] (HELO mail-wi0-f173.google.com)
> (209.85.212.173)
> by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:20
> +
> Received: by mail-wi0-f173.google.com with SMTP id cc10so349270wib.6
> for ; Fri, 11 Jul 2014 15:10:58
> -0700 (PDT)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> d=gmail.com; s=20120113;
> h=mime-version:date:message-id:subject:from:to:content-type;
> bh=kidlw3R7uWQaPspPvOk8WJFI36NQFLw02hzB1Mp9UVc=;
>
> b=WiLScFUuJZYgoF7St7OB4FdLcnRq4xvu1zO90rcJ3RlcLI2cT77fVe/KhCXDeanjwe
>
>  9570nq83zivE2a/suKw/6j90hM/eGWas1Dw+N63myi69AN6V9q2FZICazw/WcPfVAPGY
>
>  Vl7/OjjjAdIEDJ9bBglJ857FpkpOZ3ES+ZhmQb3xnEmqCyDMMfWDPeX7q8ZyHhJCkTgY
>
>  EQuc6tD2Qco9Q9tYlqxv0gnqZQLR5RqgOnt/HzDE2b9Hrz+QUfmI039x6g5AQ7BKMI9h
>
>  GHn2TTXJ31eGH+Iin0TG/SBLs8OKCttD0OeS+1XFH5zAHSSFlc734BDb5LQnBkqGDpIE
>  hU8g==
> MIME-Version: 1.0
> X-Received: by 10.194.87.97 with SMTP id w1mr2272592wjz.42.14051