He’s saying you need to move the filters for the ‘p’ table in order to do what 
you want. They need to be before your WHERE. The order of operations in sql 
applies your join clause filters before the WHERE. The filters on your ‘s’ 
table need to stay in the WHERE. It’s the only time the ordering matters when 
you are doing OUTER JOINs.

Intuitively with your query why would you do an OUTER JOIN meaning you think 
some things in ‘p’ will be NULL after your join. But your filter in the WHERE 
gets rid of all the Nulls in ‘p’.  You basically force it into an Inner join 
with that filter and the planner recognizes that.

Ryan Kleck
Software Developer IV
Customer Knowledge Platform
________________________________
From: Roland Johann <roland.joh...@phenetic.io.INVALID>
Sent: Thursday, April 30, 2020 8:30:05 AM
To: randy clinton <randyclin...@gmail.com>
Cc: Roland Johann <roland.joh...@phenetic.io.invalid>; user 
<user@spark.apache.org>
Subject: Re: Left Join at SQL query gets planned as inner join

Notice: This email is from an external sender.



Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be 
removed.

It seems like you are asking for a left join, but your filters demand the 
behavior of an inner join.
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do 
a full table scan to only get the data for that one day of the second table. To 
be more precise: We want to left outer join the two tables records of the same 
day and join on id properties where the second table must not contain records 
matching the join condition.

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)
that points to the second question of mine: event_id is null should be done 
after the join to get records of the first table that doesn’t match to records 
of the first table. The plan actually prints that it pushes down that filter to 
parquet and doesn’t select it anyway, so the entire result set is empty caused 
by the additional inner join. The desired behavior can be achieved by left anti 
joins but that’s not the point as the where condition behaves differently that 
one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but 
somehow related, problems within a single thread..

Best
Roland


Am 30.04.2020 um 17:20 schrieb randy clinton 
<randyclin...@gmail.com<mailto:randyclin...@gmail.com>>:

Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the 
behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann 
<roland.joh...@phenetic.io.invalid<mailto:roland.joh...@phenetic.io.invalid>> 
wrote:
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
behavior of the query planner/optimizer and therefore get wrong results.


select
    s.event_id as search_event_id,
    s.query_string,
    p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
    s.year = 2020 and s.month = 4 and s.day = 29
    and p.year = 2020 and p.month = 4 and p.day = 29
limit 1

This query leads to that plan:

*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
   :  +- *(1) Project [event_id#12131, query_string#12178]
   :     +- *(1) Filter isnotnull(event_id#12131)
   :        +- *(1) FileScan parquet 
s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: 
true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., 
PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
PushedFilters: [IsNotNull(event_id)], ReadSchema: 
struct<event_id:string,query_string:string>
   +- *(2) Project [event_id#12209, source_event_id#12221]
      +- *(2) Filter isnotnull(source_event_id#12221)
         +- *(2) FileScan parquet 
s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
Batched: true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., 
PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., 
PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
struct<event_id:string,source_event_id:string>

Without partition pruning the join gets planned as LeftOuter, with 
SortMergeJoin but we need partition pruning in this case to prevent full table 
scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine

val s = spark.sql("select event_id, query_string from ssi_kpi.search where year 
= 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where 
year = 2020 and month = 4 and day = 29")

s
  .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
  .groupBy(s("query_string"))
  .agg(count(s("query_string")), count(p("event_id")))
  .show()


The second thing we saw that conditions at the where clause of joined tables 
gets pushed down to the parquet files and lead to wring results, for example:

select
    s.event_id as search_event_id,
    s.query_string,
    p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
    s.year = 2020 and s.month = 4 and s.day = 29
    and p.year = 2020 and p.month = 4 and p.day = 29
    and p.event_id is null

Until now I assumed that the string based queries and the scala dsl lead to the 
same execution plan. Can someone point to docs about the internals of this 
topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann


--
I appreciate your time,

~Randy

randyclin...@gmail.com<mailto:randyclin...@gmail.com>

Reply via email to