Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be 
removed.

> It seems like you are asking for a left join, but your filters demand the 
> behavior of an inner join. 
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do 
a full table scan to only get the data for that one day of the second table. To 
be more precise: We want to left outer join the two tables records of the same 
day and join on id properties where the second table must not contain records 
matching the join condition.

> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
> null)
that points to the second question of mine: event_id is null should be done 
after the join to get records of the first table that doesn’t match to records 
of the first table. The plan actually prints that it pushes down that filter to 
parquet and doesn’t select it anyway, so the entire result set is empty caused 
by the additional inner join. The desired behavior can be achieved by left anti 
joins but that’s not the point as the where condition behaves differently that 
one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but 
somehow related, problems within a single thread..

Best
Roland


> Am 30.04.2020 um 17:20 schrieb randy clinton <randyclin...@gmail.com>:
> 
> Does it still plan an inner join if you remove a filter on both tables?
> 
> It seems like you are asking for a left join, but your filters demand the 
> behavior of an inner join. 
> 
> Maybe you could do the filters on the tables first and then join them.
> 
> Something roughly like..
> 
> s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
> p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is 
> null)
> 
> output = s_DF.join(p_DF, event_id == source_event_id, left)
> 
> 
> 
> On Thu, Apr 30, 2020 at 11:06 AM Roland Johann 
> <roland.joh...@phenetic.io.invalid> wrote:
> Hi All,
> 
> 
> we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
> behavior of the query planner/optimizer and therefore get wrong results.
> 
> select
>     s.event_id as search_event_id,
>     s.query_string,
>     p.event_id
> from s
> left outer join p on s.event_id = p.source_event_id
> where
>     s.year = 2020 and s.month = 4 and s.day = 29
>     and p.year = 2020 and p.month = 4 and p.day = 29
> limit 1
> This query leads to that plan:
> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
> event_id#12209]
> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
> BuildLeft
>    :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
> true]))
>    :  +- *(1) Project [event_id#12131, query_string#12178]
>    :     +- *(1) Filter isnotnull(event_id#12131)
>    :        +- *(1) FileScan parquet 
> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/...,
>  PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
> isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
> PushedFilters: [IsNotNull(event_id)], ReadSchema: 
> struct<event_id:string,query_string:string>
>    +- *(2) Project [event_id#12209, source_event_id#12221]
>       +- *(2) Filter isnotnull(source_event_id#12221)
>          +- *(2) FileScan parquet 
> s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., 
> PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
> isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), 
> (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
> struct<event_id:string,source_event_id:string>
> Without partition pruning the join gets planned as LeftOuter, with 
> SortMergeJoin but we need partition pruning in this case to prevent full 
> table scans and profit from broadcast join...
> 
> As soon as we rewrite the query with scala the plan looks fine
> val s = spark.sql("select event_id, query_string from ssi_kpi.search where 
> year = 2020 and month = 4 and day = 29")
> val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show 
> where year = 2020 and month = 4 and day = 29")
> 
> s
>   .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
>   .groupBy(s("query_string"))
>   .agg(count(s("query_string")), count(p("event_id")))
>   .show()
> 
> 
> The second thing we saw that conditions at the where clause of joined tables 
> gets pushed down to the parquet files and lead to wring results, for example:
> select
>     s.event_id as search_event_id,
>     s.query_string,
>     p.event_id
> from s
> left outer join p on s.event_id = p.source_event_id
> where
>     s.year = 2020 and s.month = 4 and s.day = 29
>     and p.year = 2020 and p.month = 4 and p.day = 29
>     and p.event_id is null
> Until now I assumed that the string based queries and the scala dsl lead to 
> the same execution plan. Can someone point to docs about the internals of 
> this topic of spark? The official docs about SQL in general are not that 
> verbose.
> 
> Thanks in advance and stay safe!
> 
> Roland Johann
> 
> 
> -- 
> I appreciate your time,
> 
> ~Randy
> 
> randyclin...@gmail.com <mailto:randyclin...@gmail.com>

Reply via email to