Question about Spark, Inner Join and Delegation to a Parquet Table
I have a question about Spark and how it delegates filters to a Parquet-based table. I have two tables in Hive in Parquet format. Table1 has with four columns of type double and table2 has two columns of type double. I am doing an INNER JOIN of the following: SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax I noticed that the execution plan as reported by Spark is only delegating the IsNull filter to the tables and not any other filters: == Physical Plan == *Project [name#0] +- BroadcastNestedLoopJoin BuildRight, Inner, x#36 >= xmin#13) && (x#36 <= xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16)) :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16] : +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && isnotnull(ymax#16)) && isnotnull(xmax#15)) : +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: struct +- BroadcastExchange IdentityBroadcastMode +- *Project [x#36, y#37] +- *Filter (isnotnull(y#37) && isnotnull(x#36)) +- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: struct However, when doing a filter against a single table the filter is delegated to the table: SELECT name FROM table1 where table1.xmin > -73.4454183678 == Physical Plan == CollectLimit 21 +- *Project [pbkey#150] +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678)) +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, PartitionFilters: [], PushedFilters: [IsNotNull(xmin), GreaterThan(xmin,-73.4454183678)], ReadSchema: struct So the question is: does Spark delegate filters in a join condition to a Parquet table or is this an error in the "explain plan" output? Thanks.
Re: Spark Inner Join on pivoted datasets results empty dataset
Is there any limit on number of columns used in inner join ? Thank you Anil Langote Sent from my iPhone _ From: Anil Langote <anillangote0...@gmail.com<mailto:anillangote0...@gmail.com>> Sent: Thursday, October 19, 2017 5:01 PM Subject: Spark Inner Join on pivoted datasets results empty dataset To: user <user@spark.apache.org<mailto:user@spark.apache.org>> Hi All, I have a requirement to pivot multiple columns using single columns, the pivot API doesn't support doing that hence I have been doing pivot for two columns and then trying to merge the dataset the result is producing empty dataset. Below is the sudo code Main dataset => 33 columns (30 columns are string and 2 columns are of type double array lets say vector1 and vector2, 1 column Decider which has 0 & 1 values) String grouByColumns = "col1,col2,col3,col4,col5,col6...col30"; Vector columns : Vector1 and Vector2 i do pivot like below List< Object > values = new ArrayList(); values.add("0"); values.add("1") Dataset pivot1 = mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1")); pivot1 = pivot1.withColumRenamed("0","Vector1_0"); pivot1 = pivot1.withColumRenamed("1","Vector1_1"); Count on pivot1 = 12856 Dataset pivot2 = mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2")); pivot2 = pivot2.withColumRenamed("0","Vector2_0"); pivot2 = pivot2.withColumRenamed("1","Vector2_1"); Count on pivot2 = 12856 Dataset finalDataset = pivot1.join(pivot2,Seq); Count on pivot1 = 0 ? Why this sould be 12856 right? The same code works on local with less columns and 100 records. Is there anything i am missing here is there any better way to pivot the multiple columns i can not do combine because my aggregation columns are array of doubles. The pivot1 & pivot2 dataset derived by same parent dataset the group by columns are same all i am doing is inner join on these two dataset with same group by columns why it doesn't work? Thank you Anil Langote
Spark Inner Join on pivoted datasets results empty dataset
Hi All, I have a requirement to pivot multiple columns using single columns, the pivot API doesn't support doing that hence I have been doing pivot for two columns and then trying to merge the dataset the result is producing empty dataset. Below is the sudo code Main dataset => 33 columns (30 columns are string and 2 columns are of type double array lets say vector1 and vector2, 1 column Decider which has 0 & 1 values) String grouByColumns = "col1,col2,col3,col4,col5,col6...col30"; Vector columns : Vector1 and Vector2 i do pivot like below List< Object > values = new ArrayList(); values.add("0"); values.add("1") Dataset pivot1 = mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1")); pivot1 = pivot1.withColumRenamed("0","Vector1_0"); pivot1 = pivot1.withColumRenamed("1","Vector1_1"); *Count on pivot1* = 12856 Dataset pivot2 = mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2")); pivot2 = pivot2.withColumRenamed("0","Vector2_0"); pivot2 = pivot2.withColumRenamed("1","Vector2_1"); *Count on pivot2* = 12856 Dataset finalDataset = pivot1.join(pivot2,Seq); *Count on pivot1 *= 0 ? Why this sould be 12856 right? The same code works on local with less columns and 100 records. Is there anything i am missing here is there any better way to pivot the multiple columns i can not do combine because my aggregation columns are array of doubles. The pivot1 & pivot2 dataset derived by same parent dataset the group by columns are same all i am doing is inner join on these two dataset with same group by columns why it doesn't work? Thank you Anil Langote
spark inner join
Hi All, In sql say for example I have table1 (moveid) and table2 (movieid,moviename) in sql we write something like select moviename ,movieid,count(1) from table2 inner join table table1 on table1.movieid=table2.moveid group by , here in sql table1 has only one column where as table 2 has two columns still the join works , same way in spark can join on keys from both the rdd's ? – when I tried to join two rdd in spark both the rdd's should have number of elements for that I need to add a dummy value 0 for example is there other way around or am I doing completely wrong ? val lines=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.data") val movienamesfile=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.item") val moviesid=lines.map(x => x.split("\t")).map(x => (x(1),0)) val test=moviesid.map(x => x._1) val movienames=movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1))) val movienamejoined=moviesid.join(movienames).distinct() Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-inner-join-tp25193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org