Re: Spark SQL table Join, one task is taking long
Have you tried joins on regular RDD instead of schemaRDD? We have found that its 10 times faster than joins between schemaRDDs. val largeRDD = ... val smallRDD = ... largeRDD.join(smallRDD) // otherway JOIN would run for long. Only limitation i see with that implementation is regular RDD supports only 22 fields unless you use nested the case class. On Thu, Dec 4, 2014 at 12:57 PM, Venkat Subramanian wrote: > Hi Cheng, > > Thank you very much for taking your time and providing a detailed > explanation. > I tried a few things you suggested and some more things. > > The ContactDetail table (8 GB) is the fact table and DAgents is the Dim > table (<500 KB), reverse of what you are assuming, but your ideas still > apply. > > I tried the following: > > a) Cached the smaller Dim table to memory. > sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "1000") > sqlContext.cacheTable("DAgents") > > UI -> Stage -> Storage shows it to be cached in RDD when I run it. > > val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE > ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") > > CDJoinQry.map(ta => ta(4)).count > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min > > b) I reversed both the order of tables and where clause in the query > > val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE > DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") > > The performance went bad. It took 6-7 min to complete. > > Just changing the order of table in Select for this join, keeping the same > where clause order, perf was similar (1.2-1.4 min). > > c) Using query in a), I tried to keep the storage in columnar fashion with > sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min. > Not sure if it even works. > > d) I tried changing the comma separated HDFS files to Parquet format in > HDFS > and reading it as parquet and then running query on it. > > DAgents.saveAsParquetFile("DAgents.parquet") > FCDRDD.saveAsParquetFile("ContactDetail.parquet") > > > val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") > DAgentsParquetRDD.registerAsTable("DAgentsParquet") > > val FContactDetailParquetRDD = > sqlContext.parquetFile("ContactDetail.parquet") > FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") > > val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, > DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and > DAgentsParquet.f1 = 902") > CDJoinQryParquet.map(ta => ta(4)).count > > *The query time is actually more for this join query.* It ended up taking > 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse > than non parquet for this join. > > I changed the query where table order and where clause was reversed and ran > it for parquet > > val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM > DAgentsParquet, > ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and > DAgentsParquet.f1=ContactDetailParquet.f6 ") > CDJoinQryParquetReversed.map(ta => ta(4)).count > > it took > 18 min and had to kill it as it kept on running. > > *But queries where there is no join, Parquet's performance was extremely > good.* > For example, this query below where there is no join, ran in 8 seconds, > whereas the same query in non parquet took 30 seconds. > val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet > WHERE ContactDetailParquet.f6 = 902") > CDJoinQryParquet0.map(ta => ta(4)).count > > *Some potential conclusions (pl. comment) :* > * Order in where clause seems to matter in Spark SQL optimizer. In > relational DBs that I have worked with, when I noticed, order of where > clause is typically a hint . Would be nice of Spark SQL optimizer is fixed > to ignore order of clauses and optimize it automatically. > * I tried changing just the table order in Select statement for a join and > it also seems to matter when reading data from HDFS (for parquet and to a > less extent for non parquet in my case) even when the where clause order is > same. Would be nice of SQL optimizer optimizes it automatically. > * Table joins for huge table(s) are costly. Fact and Dimension concepts > from > star schema don't translate well to Big Data (Hadoop, Spark). It may be > better to de-normalize and store huge tables to avoid Joins. Joins seem to > be evil. (Have tried de-normalizing when using Cassandra, but that has its > own problem of resulting in full table scan when running ad-hoc queries > when > the keys are not known) > > Regards, > > Venkat > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > -
Re: spark RDD join Error
Thanks a lot, that fixed the issue :) On Thu, Sep 4, 2014 at 4:51 PM, Zhan Zhang wrote: > Try this: > Import org.apache.spark.SparkContext._ > > Thanks. > > Zhan Zhang > > > On Sep 4, 2014, at 4:36 PM, Veeranagouda Mukkanagoudar > wrote: > > I am planning to use RDD join operation, to test out i was trying to > compile some test code, but am getting following compilation error > > *value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]* > *[error] rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }* > > Code: > > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.rdd.RDD > > def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) : > RDD[(String, Int)] = { > rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) } > } > > Any help would be great . > > Veera > > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You.
spark RDD join Error
I am planning to use RDD join operation, to test out i was trying to compile some test code, but am getting following compilation error *value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]* *[error] rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) }* Code: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) : RDD[(String, Int)] = { rddA.join(rddB).map { case (k, (a, b)) => (k, a+b) } } Any help would be great . Veera
Re: confirm subscribe to user@spark.apache.org
On Fri, Jul 11, 2014 at 3:11 PM, wrote: > Hi! This is the ezmlm program. I'm managing the > user@spark.apache.org mailing list. > > To confirm that you would like > >veera...@gmail.com > > added to the user mailing list, please send > a short reply to this address: > >user-sc.1405116686.kijenhjamnjaodhflpgc-veeran54= > gmail@spark.apache.org > > Usually, this happens when you just hit the "reply" button. > If this does not work, simply copy the address and paste it into > the "To:" field of a new message. > > This confirmation serves two purposes. First, it verifies that I am able > to get mail through to you. Second, it protects you in case someone > forges a subscription request in your name. > > Please note that ALL Apache dev- and user- mailing lists are publicly > archived. Do familiarize yourself with Apache's public archive policy at > > http://www.apache.org/foundation/public-archives.html > > prior to subscribing and posting messages to user@spark.apache.org. > If you're not sure whether or not the policy applies to this mailing list, > assume it does unless the list name contains the word "private" in it. > > Some mail programs are broken and cannot handle long addresses. If you > cannot reply to this request, instead send a message to > and put the > entire address listed above into the "Subject:" line. > > > --- Administrative commands for the user list --- > > I can handle administrative requests automatically. Please > do not send them to the list address! Instead, send > your message to the correct command address: > > To subscribe to the list, send a message to: > > > To remove your address from the list, send a message to: > > > Send mail to the following for info and FAQ for this list: > > > > Similar addresses exist for the digest list: > > > > To get messages 123 through 145 (a maximum of 100 per request), mail: > > > To get an index with subject and author for messages 123-456 , mail: > > > They are always returned as sets of 100, max 2000 per request, > so you'll actually get 100-499. > > To receive all messages with the same subject as message 12345, > send a short message to: > > > The messages should contain one line or word of text to avoid being > treated as sp@m, but I will ignore their content. > Only the ADDRESS you send to is important. > > You can start a subscription for an alternate address, > for example "john@host.domain", just add a hyphen and your > address (with '=' instead of '@') after the command word: > > > To stop subscription for this address, mail: > > > In both cases, I'll send a confirmation message to that address. When > you receive it, simply reply to it to complete your subscription. > > If despite following these instructions, you do not get the > desired results, please contact my owner at > user-ow...@spark.apache.org. Please be patient, my owner is a > lot slower than I am ;-) > > --- Enclosed is a copy of the request I received. > > Return-Path: > Received: (qmail 70277 invoked by uid 99); 11 Jul 2014 22:11:26 - > Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) > by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:26 > + > X-ASF-Spam-Status: No, hits=-0.3 required=10.0 > > tests=ASF_LIST_OPS,FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS > X-Spam-Check-By: apache.org > Received-SPF: pass (athena.apache.org: domain of veera...@gmail.com > designates 209.85.212.173 as permitted sender) > Received: from [209.85.212.173] (HELO mail-wi0-f173.google.com) > (209.85.212.173) > by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:20 > + > Received: by mail-wi0-f173.google.com with SMTP id cc10so349270wib.6 > for ; Fri, 11 Jul 2014 15:10:58 > -0700 (PDT) > DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; > d=gmail.com; s=20120113; > h=mime-version:date:message-id:subject:from:to:content-type; > bh=kidlw3R7uWQaPspPvOk8WJFI36NQFLw02hzB1Mp9UVc=; > > b=WiLScFUuJZYgoF7St7OB4FdLcnRq4xvu1zO90rcJ3RlcLI2cT77fVe/KhCXDeanjwe > > 9570nq83zivE2a/suKw/6j90hM/eGWas1Dw+N63myi69AN6V9q2FZICazw/WcPfVAPGY > > Vl7/OjjjAdIEDJ9bBglJ857FpkpOZ3ES+ZhmQb3xnEmqCyDMMfWDPeX7q8ZyHhJCkTgY > > EQuc6tD2Qco9Q9tYlqxv0gnqZQLR5RqgOnt/HzDE2b9Hrz+QUfmI039x6g5AQ7BKMI9h > > GHn2TTXJ31eGH+Iin0TG/SBLs8OKCttD0OeS+1XFH5zAHSSFlc734BDb5LQnBkqGDpIE > hU8g== > MIME-Version: 1.0 > X-Received: by 10.194.87.97 with SMTP id w1mr2272592wjz.42.14051