Re: Spark SQL table Join, one task is taking long
Have you tried joins on regular RDD instead of schemaRDD? We have found that its 10 times faster than joins between schemaRDDs. val largeRDD = ... val smallRDD = ... largeRDD.join(smallRDD) // otherway JOIN would run for long. Only limitation i see with that implementation is regular RDD supports only 22 fields unless you use nested the case class. On Thu, Dec 4, 2014 at 12:57 PM, Venkat Subramanian wrote: > Hi Cheng, > > Thank you very much for taking your time and providing a detailed > explanation. > I tried a few things you suggested and some more things. > > The ContactDetail table (8 GB) is the fact table and DAgents is the Dim > table (<500 KB), reverse of what you are assuming, but your ideas still > apply. > > I tried the following: > > a) Cached the smaller Dim table to memory. > sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "1000") > sqlContext.cacheTable("DAgents") > > UI -> Stage -> Storage shows it to be cached in RDD when I run it. > > val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE > ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") > > CDJoinQry.map(ta => ta(4)).count > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min > > b) I reversed both the order of tables and where clause in the query > > val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE > DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") > > The performance went bad. It took 6-7 min to complete. > > Just changing the order of table in Select for this join, keeping the same > where clause order, perf was similar (1.2-1.4 min). > > c) Using query in a), I tried to keep the storage in columnar fashion with > sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min. > Not sure if it even works. > > d) I tried changing the comma separated HDFS files to Parquet format in > HDFS > and reading it as parquet and then running query on it. > > DAgents.saveAsParquetFile("DAgents.parquet") > FCDRDD.saveAsParquetFile("ContactDetail.parquet") > > > val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") > DAgentsParquetRDD.registerAsTable("DAgentsParquet") > > val FContactDetailParquetRDD = > sqlContext.parquetFile("ContactDetail.parquet") > FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") > > val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, > DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and > DAgentsParquet.f1 = 902") > CDJoinQryParquet.map(ta => ta(4)).count > > *The query time is actually more for this join query.* It ended up taking > 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse > than non parquet for this join. > > I changed the query where table order and where clause was reversed and ran > it for parquet > > val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM > DAgentsParquet, > ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and > DAgentsParquet.f1=ContactDetailParquet.f6 ") > CDJoinQryParquetReversed.map(ta => ta(4)).count > > it took > 18 min and had to kill it as it kept on running. > > *But queries where there is no join, Parquet's performance was extremely > good.* > For example, this query below where there is no join, ran in 8 seconds, > whereas the same query in non parquet took 30 seconds. > val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet > WHERE ContactDetailParquet.f6 = 902") > CDJoinQryParquet0.map(ta => ta(4)).count > > *Some potential conclusions (pl. comment) :* > * Order in where clause seems to matter in Spark SQL optimizer. In > relational DBs that I have worked with, when I noticed, order of where > clause is typically a hint . Would be nice of Spark SQL optimizer is fixed > to ignore order of clauses and optimize it automatically. > * I tried changing just the table order in Select statement for a join and > it also seems to matter when reading data from HDFS (for parquet and to a > less extent for non parquet in my case) even when the where clause order is > same. Would be nice of SQL optimizer optimizes it automatically. > * Table joins for huge table(s) are costly. Fact and Dimension concepts > from > star schema don't translate well to Big Data (Hadoop, Spark). It may be > better to de-normalize and store huge tables to avoid Joins. Joins seem to > be evil. (Have tried de-normalizing when using Cassandra, but that has its > own problem of resulting in full table scan when running ad-hoc queries > when > the keys are not known) > > Regards, > > Venkat > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > -
Re: Spark SQL table Join, one task is taking long
Hi Cheng, Thank you very much for taking your time and providing a detailed explanation. I tried a few things you suggested and some more things. The ContactDetail table (8 GB) is the fact table and DAgents is the Dim table (<500 KB), reverse of what you are assuming, but your ideas still apply. I tried the following: a) Cached the smaller Dim table to memory. sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "1000") sqlContext.cacheTable("DAgents") UI -> Stage -> Storage shows it to be cached in RDD when I run it. val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") CDJoinQry.map(ta => ta(4)).count I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min b) I reversed both the order of tables and where clause in the query val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") The performance went bad. It took 6-7 min to complete. Just changing the order of table in Select for this join, keeping the same where clause order, perf was similar (1.2-1.4 min). c) Using query in a), I tried to keep the storage in columnar fashion with sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min. Not sure if it even works. d) I tried changing the comma separated HDFS files to Parquet format in HDFS and reading it as parquet and then running query on it. DAgents.saveAsParquetFile("DAgents.parquet") FCDRDD.saveAsParquetFile("ContactDetail.parquet") val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") DAgentsParquetRDD.registerAsTable("DAgentsParquet") val FContactDetailParquetRDD = sqlContext.parquetFile("ContactDetail.parquet") FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and DAgentsParquet.f1 = 902") CDJoinQryParquet.map(ta => ta(4)).count *The query time is actually more for this join query.* It ended up taking 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse than non parquet for this join. I changed the query where table order and where clause was reversed and ran it for parquet val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM DAgentsParquet, ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and DAgentsParquet.f1=ContactDetailParquet.f6 ") CDJoinQryParquetReversed.map(ta => ta(4)).count it took > 18 min and had to kill it as it kept on running. *But queries where there is no join, Parquet's performance was extremely good.* For example, this query below where there is no join, ran in 8 seconds, whereas the same query in non parquet took 30 seconds. val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet WHERE ContactDetailParquet.f6 = 902") CDJoinQryParquet0.map(ta => ta(4)).count *Some potential conclusions (pl. comment) :* * Order in where clause seems to matter in Spark SQL optimizer. In relational DBs that I have worked with, when I noticed, order of where clause is typically a hint . Would be nice of Spark SQL optimizer is fixed to ignore order of clauses and optimize it automatically. * I tried changing just the table order in Select statement for a join and it also seems to matter when reading data from HDFS (for parquet and to a less extent for non parquet in my case) even when the where clause order is same. Would be nice of SQL optimizer optimizes it automatically. * Table joins for huge table(s) are costly. Fact and Dimension concepts from star schema don't translate well to Big Data (Hadoop, Spark). It may be better to de-normalize and store huge tables to avoid Joins. Joins seem to be evil. (Have tried de-normalizing when using Cassandra, but that has its own problem of resulting in full table scan when running ad-hoc queries when the keys are not known) Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL table Join, one task is taking long
Hey Venkat, This behavior seems reasonable. According to the table name, I guess here |DAgents| should be the fact table and |ContactDetails| is the dim table. Below is an explanation of a similar query, you may see |src| as |DAgents| and |src1| as |ContactDetails|. |0: jdbc:hive2://localhost:1> explain extended select * from src, src1 where src.key = src1.key and src.key = 100; ++ |plan | ++ | == Parsed Logical Plan == | | 'Project [*] | | 'Filter (('src.key = 'src1.key) && ('src.key = 100)) | | 'Join Inner, None | |'UnresolvedRelation None, src, None | |'UnresolvedRelation None, src1, None | | | | == Analyzed Logical Plan == | | Project [key#81,value#82,key#83,value#84] | | Filter ((key#81 = key#83) && (key#81 = 100)) | | Join Inner, None | |MetastoreRelation default, src, None | |MetastoreRelation default, src1, None | | | | == Optimized Logical Plan == | | Project [key#81,value#82,key#83,value#84] | | Join Inner, Some((key#81 = key#83)) | | Filter (key#81 = 100) | |MetastoreRelation default, src, None | | MetastoreRelation default, src1, None | | | | == Physical Plan == | | Project [key#81,value#82,key#83,value#84] | | ShuffledHashJoin [key#81], [key#83], BuildRight | | Exchange (HashPartitioning [key#81], 200) | |Filter (key#81 = 100) | | HiveTableScan [key#81,value#82], (MetastoreRelation default, src, None), None | | Exchange (HashPartitioning [key#83], 200) | |HiveTableScan [key#83,value#84], (MetastoreRelation default, src1, None), None | | | | Code Generation: false | | == RDD == | ++ | Please notice the |Filter| node in the physical plan. In your case, all the filtered rows are shuffled into a single partition because |DAgents.f1| is both the predicate key and the shuffle key, and that partition is handled by the task that lasts for more than 1 second. All other tasks in the count stage cost only a few ms because they don’t receive any rows from |DAgents|. If |ContactDetails| is small enought, you can cache |ContactDetails| first and set |spark.sql.autoBroadcastJoinShreshold| larger than the size of |ContactDetails|, a broadcast join rather than a would be performed, and would usually result better performance. Cheng On 12/2/14 6:35 AM, Venkat Subramanian wrote: Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB Ram each. Default serialization, Standalone, no security Data was sqooped from relational DB to HDFS and Data is partitioned across HDFS uniformly. I am reading a fact table about 8 GB in size and one small dim table from HDFS and then doing a join on them based on a criteria. . Running the Driver on Spark shell on Spark master. ContactDetail and DAgents are read as RDD and registered as table already. Each of these tables have 60 to 90 fields and I am using Product class. val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") CDJoinQry.map(ta => ta(4)).count // resul
Re: Spark SQL table Join, one task is taking long
Bump up. Michael Armbrust, anybody from Spark SQL team? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20218.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org