[ 
https://issues.apache.org/jira/browse/SPARK-34369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Rishindra Pothireddi updated SPARK-34369:
--------------------------------------------------
    Description: 
Often users face a scenario where even a modest skew in a join can lead to 
tasks appearing to be stuck, due to the O(n^2) nature of a join considering all 
pairs of rows with matching keys. When this happens users think that spark has 
gotten deadlocked. If there is a bound condition, the "number of output rows" 
metric may look typical. Other metrics may look very modest (eg: shuffle read). 
In those cases, it is very hard to understand what the problem is. There is no 
conclusive proof without getting a heap dump and looking at some internal data 
structures.

It would be much better if spark had a metric(which we propose be titled 
“number of matched pairs” as a companion to “number of output rows”) which 
showed the user how many pairs were being processed in the join. This would get 
updated in the live UI (when metrics get collected during heartbeats), so the 
user could easily see what was going on.

This would even help in cases where there was some other cause of a stuck 
executor (eg. network issues) just to disprove this theory. For example, you 
may have 100k records with the same key on each side of a join. That probably 
won't really show up as extreme skew in task input data. But it'll become 10B 
join pairs that spark works through, in one task.

 

To further demonstrate the usefulness of this metric please follow the steps 
below.

 

    _val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", "c")_

    _val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_

 

    _val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_

    _val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_

 

    _val df5 = df1.union(df2)_

    _val df6 = df3.union(df4)_

 

    _df5.createOrReplaceTempView("table1")_

    _df6.createOrReplaceTempView("table2")_
h3. InnerJoin

_sql("select p.**,* f.* from table2 p join table1 f on f.b = p.b and f.c > 
p.c").count_

_number of output rows: 5,580,000_

_number of matched pairs: 90,000,490,000_
h3. FullOuterJoin

_spark.sql("select p.**,* f.* from table2 p full outer join table1 f on f.b = 
p.b and f.c > p.c").count_

_number of output rows: 6,099,964_

_number of matched pairs: 90,000,490,000_
h3. LeftOuterJoin

_sql("select p.**,* f.* from table2 p left outer join table1 f on f.b = p.b and 
f.c > p.c").count_

_number of output rows: 6,079,964_

_number of matched pairs: 90,000,490,000_
h3. RightOuterJoin

_spark.sql("select p.**,* f.* from table2 p right outer join table1 f on f.b = 
p.b and f.c > p.c").count_

_number of output rows: 5,600,000_

_number of matched pairs: 90,000,490,000_
h3. LeftSemiJoin

_spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and f.c 
> p.c").count_

_number of output rows: 36_

_number of matched pairs: 89,994,910,036_
h3. CrossJoin

_spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b and 
f.c > p.c").count_

_number of output rows: 5,580,000_

_number of matched pairs: 90,000,490,000_
h3. LeftAntiJoin

_spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > 
p.c").count_

number of output rows: 499,964

number of matched pairs: 89,994,910,036

  was:
Often users face a scenario where even a modest skew in a join can lead to 
tasks appearing to be stuck, due to the O(n^2) nature of a join considering all 
pairs of rows with matching keys. When this happens users think that spark has 
gotten deadlocked. If there is a bound condition, the "number of output rows" 
metric may look typical. Other metrics may look very modest (eg: shuffle read). 
In those cases, it is very hard to understand what the problem is. There is no 
conclusive proof without getting a heap dump and looking at some internal data 
structures.

It would be much better if spark had a metric(which we propose be titled 
“number of matched pairs” as a companion to “number of output rows”) which 
showed the user how many pairs were being processed in the join. This would get 
updated in the live UI (when metrics get collected during heartbeats), so the 
user could easily see what was going on.

This would even help in cases where there was some other cause of a stuck 
executor (eg. network issues) just to disprove this theory. For example, you 
may have 100k records with the same key on each side of a join. That probably 
won't really show up as extreme skew in task input data. But it'll become 10B 
join pairs that spark works through, in one task.

 

To further demonstrate the usefulness of this metric please follow the steps 
below.

 

    _val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", "c")_

    _val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_

 

    _val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_

    _val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_

 

    _val df5 = df1.union(df2)_

    _val df6 = df3.union(df4)_

 

    _df5.createOrReplaceTempView("table1")_

    _df6.createOrReplaceTempView("table2")_
h3. InnerJoin

_sql("select p.*, f.* from table2 p join table1 f on f.b = p.b and f.c > 
p.c").count_

_number of output rows: 5,580,000_

_number of matched pairs: 90,000,490,000_
h3. FullOuterJoin

_spark.sql("select p.*, f.* from table2 p full outer join table1 f on f.b = p.b 
and f.c > p.c").count_

_number of output rows: 6,099,964_

_number of matched pairs: 90,000,490,000_
h3. LeftOuterJoin

_sql("select p.*, f.* from table2 p left outer join table1 f on f.b = p.b and 
f.c > p.c").count_

_number of output rows: 6,079,964_

_number of matched pairs: 90,000,490,000_
h3. RightOuterJoin

_spark.sql("select p.*, f.* from table2 p right outer join table1 f on f.b = 
p.b and f.c > p.c").count_

_number of output rows: 5,600,000_

_number of matched pairs: 90,000,490,000_
h3. LeftSemiJoin

_spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and f.c 
> p.c").count_

_number of output rows: 36_

_number of matched pairs: 89,994,910,036_
h3. CrossJoin

_spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b and 
f.c > p.c").count_

_number of output rows: 5,580,000_

_number of matched pairs: 90,000,490,000_
h3. LeftAntiJoin

_spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > 
p.c").count_

number of output rows: 499,964

number of matched pairs: 89,994,910,036


> Track number of pairs processed out of Join
> -------------------------------------------
>
>                 Key: SPARK-34369
>                 URL: https://issues.apache.org/jira/browse/SPARK-34369
>             Project: Spark
>          Issue Type: New Feature
>          Components: Web UI
>    Affects Versions: 3.2.0
>            Reporter: Srinivas Rishindra Pothireddi
>            Priority: Major
>
> Often users face a scenario where even a modest skew in a join can lead to 
> tasks appearing to be stuck, due to the O(n^2) nature of a join considering 
> all pairs of rows with matching keys. When this happens users think that 
> spark has gotten deadlocked. If there is a bound condition, the "number of 
> output rows" metric may look typical. Other metrics may look very modest (eg: 
> shuffle read). In those cases, it is very hard to understand what the problem 
> is. There is no conclusive proof without getting a heap dump and looking at 
> some internal data structures.
> It would be much better if spark had a metric(which we propose be titled 
> “number of matched pairs” as a companion to “number of output rows”) which 
> showed the user how many pairs were being processed in the join. This would 
> get updated in the live UI (when metrics get collected during heartbeats), so 
> the user could easily see what was going on.
> This would even help in cases where there was some other cause of a stuck 
> executor (eg. network issues) just to disprove this theory. For example, you 
> may have 100k records with the same key on each side of a join. That probably 
> won't really show up as extreme skew in task input data. But it'll become 10B 
> join pairs that spark works through, in one task.
>  
> To further demonstrate the usefulness of this metric please follow the steps 
> below.
>  
>     _val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", 
> "c")_
>     _val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_
>  
>     _val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_
>     _val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_
>  
>     _val df5 = df1.union(df2)_
>     _val df6 = df3.union(df4)_
>  
>     _df5.createOrReplaceTempView("table1")_
>     _df6.createOrReplaceTempView("table2")_
> h3. InnerJoin
> _sql("select p.**,* f.* from table2 p join table1 f on f.b = p.b and f.c > 
> p.c").count_
> _number of output rows: 5,580,000_
> _number of matched pairs: 90,000,490,000_
> h3. FullOuterJoin
> _spark.sql("select p.**,* f.* from table2 p full outer join table1 f on f.b = 
> p.b and f.c > p.c").count_
> _number of output rows: 6,099,964_
> _number of matched pairs: 90,000,490,000_
> h3. LeftOuterJoin
> _sql("select p.**,* f.* from table2 p left outer join table1 f on f.b = p.b 
> and f.c > p.c").count_
> _number of output rows: 6,079,964_
> _number of matched pairs: 90,000,490,000_
> h3. RightOuterJoin
> _spark.sql("select p.**,* f.* from table2 p right outer join table1 f on f.b 
> = p.b and f.c > p.c").count_
> _number of output rows: 5,600,000_
> _number of matched pairs: 90,000,490,000_
> h3. LeftSemiJoin
> _spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and 
> f.c > p.c").count_
> _number of output rows: 36_
> _number of matched pairs: 89,994,910,036_
> h3. CrossJoin
> _spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b 
> and f.c > p.c").count_
> _number of output rows: 5,580,000_
> _number of matched pairs: 90,000,490,000_
> h3. LeftAntiJoin
> _spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > 
> p.c").count_
> number of output rows: 499,964
> number of matched pairs: 89,994,910,036



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to