Thank you, that’s absolutely right!
Getting the rows of `s` without matches in `p` is now not a problem anymore.

Have a nice day
Roland


> Am 30.04.2020 um 17:36 schrieb Ryan C. Kleck <rkl...@godaddy.com>:
> 
> He’s saying you need to move the filters for the ‘p’ table in order to do 
> what you want. They need to be before your WHERE. The order of operations in 
> sql applies your join clause filters before the WHERE. The filters on your 
> ‘s’ table need to stay in the WHERE. It’s the only time the ordering matters 
> when you are doing OUTER JOINs.
> 
> Intuitively with your query why would you do an OUTER JOIN meaning you think 
> some things in ‘p’ will be NULL after your join. But your filter in the WHERE 
> gets rid of all the Nulls in ‘p’.  You basically force it into an Inner join 
> with that filter and the planner recognizes that.
> 
> Ryan Kleck
> Software Developer IV
> Customer Knowledge Platform
> From: Roland Johann <roland.joh...@phenetic.io.INVALID>
> Sent: Thursday, April 30, 2020 8:30:05 AM
> To: randy clinton <randyclin...@gmail.com>
> Cc: Roland Johann <roland.joh...@phenetic.io.invalid>; user 
> <user@spark.apache.org>
> Subject: Re: Left Join at SQL query gets planned as inner join
>  
> Notice: This email is from an external sender.
>  
> Thank for quick reply.
> 
> It plans the LeftOuter as soon as the filters on the second table will be 
> removed.
> 
>> It seems like you are asking for a left join, but your filters demand the 
>> behavior of an inner join. 
> Can you explain that?
> The filters on the second table uses partition pruning that we don’t have to 
> do a full table scan to only get the data for that one day of the second 
> table. To be more precise: We want to left outer join the two tables records 
> of the same day and join on id properties where the second table must not 
> contain records matching the join condition.
> 
>> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
>> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
>> null)
> that points to the second question of mine: event_id is null should be done 
> after the join to get records of the first table that doesn’t match to 
> records of the first table. The plan actually prints that it pushes down that 
> filter to parquet and doesn’t select it anyway, so the entire result set is 
> empty caused by the additional inner join. The desired behavior can be 
> achieved by left anti joins but that’s not the point as the where condition 
> behaves differently that one would expect.
> 
> I hope that this doesn’t gets confusing that we talk about two different, but 
> somehow related, problems within a single thread..
> 
> Best
> Roland
> 
> 
>> Am 30.04.2020 um 17:20 schrieb randy clinton <randyclin...@gmail.com 
>> <mailto:randyclin...@gmail.com>>:
>> 
>> Does it still plan an inner join if you remove a filter on both tables?
>> 
>> It seems like you are asking for a left join, but your filters demand the 
>> behavior of an inner join. 
>> 
>> Maybe you could do the filters on the tables first and then join them.
>> 
>> Something roughly like..
>> 
>> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
>> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
>> null)
>> 
>> output = s_DF.join(p_DF, event_id == source_event_id, left)
>> 
>> 
>> 
>> On Thu, Apr 30, 2020 at 11:06 AM Roland Johann 
>> <roland.joh...@phenetic.io.invalid 
>> <mailto:roland.joh...@phenetic.io.invalid>> wrote:
>> Hi All,
>> 
>> 
>> we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
>> behavior of the query planner/optimizer and therefore get wrong results.
>> 
>> select
>>     s.event_id as search_event_id,
>>     s.query_string,
>>     p.event_id
>> from s
>> left outer join p on s.event_id = p.source_event_id
>> where
>>     s.year = 2020 and s.month = 4 and s.day = 29
>>     and p.year = 2020 and p.month = 4 and p.day = 29
>> limit 1
>> This query leads to that plan:
>> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
>> event_id#12209]
>> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
>> BuildLeft
>>    :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
>> true]))
>>    :  +- *(1) Project [event_id#12131, query_string#12178]
>>    :     +- *(1) Filter isnotnull(event_id#12131)
>>    :        +- *(1) FileScan parquet 
>> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] 
>> Batched: true, Format: Parquet, Location: 
>> PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/...,
>>  PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
>> isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), 
>> (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: 
>> struct<event_id:string,query_string:string>
>>    +- *(2) Project [event_id#12209, source_event_id#12221]
>>       +- *(2) Filter isnotnull(source_event_id#12221)
>>          +- *(2) FileScan parquet 
>> s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
>> Batched: true, Format: Parquet, Location: 
>> PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., 
>> PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
>> isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), 
>> (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
>> struct<event_id:string,source_event_id:string>
>> Without partition pruning the join gets planned as LeftOuter, with 
>> SortMergeJoin but we need partition pruning in this case to prevent full 
>> table scans and profit from broadcast join...
>> 
>> As soon as we rewrite the query with scala the plan looks fine
>> val s = spark.sql("select event_id, query_string from ssi_kpi.search where 
>> year = 2020 and month = 4 and day = 29")
>> val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show 
>> where year = 2020 and month = 4 and day = 29")
>> 
>> s
>>   .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
>>   .groupBy(s("query_string"))
>>   .agg(count(s("query_string")), count(p("event_id")))
>>   .show()
>> 
>> 
>> The second thing we saw that conditions at the where clause of joined tables 
>> gets pushed down to the parquet files and lead to wring results, for example:
>> select
>>     s.event_id as search_event_id,
>>     s.query_string,
>>     p.event_id
>> from s
>> left outer join p on s.event_id = p.source_event_id
>> where
>>     s.year = 2020 and s.month = 4 and s.day = 29
>>     and p.year = 2020 and p.month = 4 and p.day = 29
>>     and p.event_id is null
>> Until now I assumed that the string based queries and the scala dsl lead to 
>> the same execution plan. Can someone point to docs about the internals of 
>> this topic of spark? The official docs about SQL in general are not that 
>> verbose.
>> 
>> Thanks in advance and stay safe!
>> 
>> Roland Johann
>> 
>> 
>> -- 
>> I appreciate your time,
>> 
>> ~Randy
>> 
>> randyclin...@gmail.com <mailto:randyclin...@gmail.com>

Reply via email to