Question about Spark, Inner Join and Delegation to a Parquet Table

2018-07-02 Thread Mike Buck
I have a question about Spark and how it delegates filters to a Parquet-based 
table. I have two tables in Hive in Parquet format. Table1 has with four 
columns of type double and table2 has two columns of type double. I am doing an 
INNER JOIN of the following:

SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN 
table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax

I noticed that the execution plan as reported by Spark is only delegating the 
IsNull filter to the tables and not any other filters:

== Physical Plan ==
*Project [name#0]
+- BroadcastNestedLoopJoin BuildRight, Inner, x#36 >= xmin#13) && (x#36 <= 
xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16))
   :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16]
   :  +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && 
isnotnull(ymax#16)) && isnotnull(xmax#15))
   : +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), 
IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: 
struct
   +- BroadcastExchange IdentityBroadcastMode
  +- *Project [x#36, y#37]
 +- *Filter (isnotnull(y#37) && isnotnull(x#36))
+- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table2], 
PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: 
struct

However, when doing a filter against a single table the filter is delegated to 
the table:

SELECT name FROM table1 where table1.xmin > -73.4454183678

== Physical Plan ==
CollectLimit 21
+- *Project [pbkey#150]
   +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678))
  +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), 
GreaterThan(xmin,-73.4454183678)], ReadSchema: struct

So the question is: does Spark delegate filters in a join condition to a 
Parquet table or is this an error in the "explain plan" output?

Thanks.



Re: Spark Inner Join on pivoted datasets results empty dataset

2017-10-19 Thread Anil Langote
Is there any limit on number of columns used in inner join ?

Thank you
Anil Langote

Sent from my iPhone
_
From: Anil Langote <anillangote0...@gmail.com<mailto:anillangote0...@gmail.com>>
Sent: Thursday, October 19, 2017 5:01 PM
Subject: Spark Inner Join on pivoted datasets results empty dataset
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>


Hi All,

I have a requirement to pivot multiple columns using single columns, the pivot 
API doesn't support doing that hence I have been doing pivot for two columns 
and then trying to merge the dataset the result is producing empty dataset. 
Below is the sudo code

Main dataset => 33 columns (30 columns are string and 2 columns are of type 
double array lets say vector1 and vector2, 1 column Decider which has 0 & 1 
values)

String grouByColumns =  "col1,col2,col3,col4,col5,col6...col30";
Vector columns : Vector1 and Vector2

i do pivot like below

List< Object > values = new ArrayList();
values.add("0");
values.add("1")

Dataset pivot1 = 
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1"));
pivot1 = pivot1.withColumRenamed("0","Vector1_0");
pivot1 = pivot1.withColumRenamed("1","Vector1_1");

Count on pivot1 = 12856

Dataset pivot2 = 
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2"));
pivot2 = pivot2.withColumRenamed("0","Vector2_0");
pivot2 = pivot2.withColumRenamed("1","Vector2_1");

Count on pivot2 = 12856

Dataset finalDataset = pivot1.join(pivot2,Seq);

Count on pivot1 = 0 ? Why this sould be 12856  right?

The same code works on local with less columns and 100 records.

Is there anything i am missing here is there any better way to pivot the 
multiple columns i can not do combine because my aggregation columns are array 
of doubles.

The pivot1 & pivot2 dataset derived by same parent dataset the group by columns 
are same all i am doing is inner join on these two dataset with same group by 
columns why it doesn't work?

Thank you
Anil Langote




Spark Inner Join on pivoted datasets results empty dataset

2017-10-19 Thread Anil Langote
Hi All,

I have a requirement to pivot multiple columns using single columns, the
pivot API doesn't support doing that hence I have been doing pivot for two
columns and then trying to merge the dataset the result is producing empty
dataset. Below is the sudo code

Main dataset => 33 columns (30 columns are string and 2 columns are of type
double array lets say vector1 and vector2, 1 column Decider which has 0 & 1
values)

String grouByColumns =  "col1,col2,col3,col4,col5,col6...col30";
Vector columns : Vector1 and Vector2

i do pivot like below

List< Object > values = new ArrayList();
values.add("0");
values.add("1")

Dataset pivot1 =
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1"));
pivot1 = pivot1.withColumRenamed("0","Vector1_0");
pivot1 = pivot1.withColumRenamed("1","Vector1_1");

*Count on pivot1* = 12856

Dataset pivot2 =
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2"));
pivot2 = pivot2.withColumRenamed("0","Vector2_0");
pivot2 = pivot2.withColumRenamed("1","Vector2_1");

*Count on pivot2* = 12856

Dataset finalDataset = pivot1.join(pivot2,Seq);

*Count on pivot1 *= 0 ? Why this sould be 12856  right?

The same code works on local with less columns and 100 records.

Is there anything i am missing here is there any better way to pivot the
multiple columns i can not do combine because my aggregation columns are
array of doubles.

The pivot1 & pivot2 dataset derived by same parent dataset the group by
columns are same all i am doing is inner join on these two dataset with
same group by columns why it doesn't work?

Thank you
Anil Langote


spark inner join

2015-10-24 Thread kali.tumm...@gmail.com
Hi All, 

In sql say for example I have table1 (moveid) and table2 (movieid,moviename)
in sql we write something like select moviename ,movieid,count(1) from
table2 inner join table table1 on table1.movieid=table2.moveid group by 
, here in sql table1 has only one column where as table 2 has two columns
still the join works , same way in spark can join on keys from both the
rdd's ? –

when I tried to join two rdd in spark both the rdd's should have number of
elements for that I need to add a dummy value 0 for example is there other
way around or am I doing completely wrong ?

val
lines=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.data")
val
movienamesfile=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.item")
val moviesid=lines.map(x => x.split("\t")).map(x => (x(1),0))
val test=moviesid.map(x => x._1)
val movienames=movienamesfile.map(x => x.split("\\|")).map(x =>
(x(0),x(1)))
val movienamejoined=moviesid.join(movienames).distinct()

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-inner-join-tp25193.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org