Hi All,

we are on vanilla Spark 2.4.4 and currently experience a somehow strange 
behavior of the query planner/optimizer and therefore get wrong results.

select
    s.event_id as search_event_id,
    s.query_string,
    p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
    s.year = 2020 and s.month = 4 and s.day = 29
    and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
   :  +- *(1) Project [event_id#12131, query_string#12178]
   :     +- *(1) Filter isnotnull(event_id#12131)
   :        +- *(1) FileScan parquet 
s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: 
true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., 
PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
PushedFilters: [IsNotNull(event_id)], ReadSchema: 
struct<event_id:string,query_string:string>
   +- *(2) Project [event_id#12209, source_event_id#12221]
      +- *(2) Filter isnotnull(source_event_id#12221)
         +- *(2) FileScan parquet 
s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
Batched: true, Format: Parquet, Location: 
PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., 
PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., 
PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with 
SortMergeJoin but we need partition pruning in this case to prevent full table 
scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year 
= 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where 
year = 2020 and month = 4 and day = 29")

s
  .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
  .groupBy(s("query_string"))
  .agg(count(s("query_string")), count(p("event_id")))
  .show()


The second thing we saw that conditions at the where clause of joined tables 
gets pushed down to the parquet files and lead to wring results, for example:
select
    s.event_id as search_event_id,
    s.query_string,
    p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
    s.year = 2020 and s.month = 4 and s.day = 29
    and p.year = 2020 and p.month = 4 and p.day = 29
    and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the 
same execution plan. Can someone point to docs about the internals of this 
topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann

Reply via email to