[jira] [Updated] (SPARK-37321) Wrong size estimation leads to "Cannot broadcast the table that is larger than 8GB: 8 GB"

2021-11-14 Thread Izek Greenfield (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Izek Greenfield updated SPARK-37321:

Description: 
When CBO is enabled then a situation occurs where spark tries to broadcast very 
large DataFrame due to wrong output size estimation.

 

In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will 
use `DataType.defaultSize`.

In the case where the output contains `functions.concat_ws`, the 
`getSizePerRow` function will estimate the size to be 20 bytes, while in our 
case the actual size can be a lot larger.

As a result, we in some cases end up with an estimated size of < 300K while the 
actual size can be > 8GB, thus leading to exceptions as spark thinks the tables 
may be broadcast but later realizes the data size is too large.

 

Code sample to reproduce:

for running that I used `-Xmx45G`
{code:scala}
import spark.implicits._

(1 to 10).toDF("index").withColumn("index", 
col("index").cast("string")).write.parquet("/tmp/a")
(1 to 1000).toDF("index_b").withColumn("index_b", 
col("index_b").cast("string")).write.parquet("/tmp/b")

val a = spark.read
   .parquet("/tmp/a")
   .withColumn("b", col("index"))
   .withColumn("l1", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l2", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l3", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l4", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l5", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))

val r = Random.alphanumeric
val l = 220
val i = 2800

val b = spark.read
   .parquet("/tmp/b")
   .withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
   .withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a => 
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
 
a.join(b, col("index") === col("index_b")).show(2000)
{code}
 

  was:
When CBO is enabled then a situation occurs where spark tries to broadcast very 
large DataFrame due to wrong output size estimation.

 

In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will 
use `DataType.defaultSize`.

In the case where the output contains `functions.concat_ws`, the 
`getSizePerRow` function will estimate the size to be 20 bytes, while in our 
case the actual size can be a lot larger.

As a result, we in some cases end up with an estimated size of < 300K while the 
actual size can be > 8GB, thus leading to exceptions as spark thinks the tables 
may be broadcast but later realizes the data size is too large.

 

Code sample to reproduce:
{code:scala}
import spark.implicits._

(1 to 10).toDF("index").withColumn("index", 
col("index").cast("string")).write.parquet("/tmp/a")
(1 to 1000).toDF("index_b").withColumn("index_b", 
col("index_b").cast("string")).write.parquet("/tmp/b")

val a = spark.read
   .parquet("/tmp/a")
   .withColumn("b", col("index"))
   .withColumn("l1", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l2", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l3", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   .withColumn("l4", functions.concat_ws("/", col("index"), 
functions.current_date(), functions.current_date(), functions.current_date(), 
functions.current_date()))
   

[jira] [Updated] (SPARK-37321) Wrong size estimation leads to "Cannot broadcast the table that is larger than 8GB: 8 GB"

2021-11-14 Thread Izek Greenfield (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Izek Greenfield updated SPARK-37321:

Component/s: SQL

> Wrong size estimation leads to "Cannot broadcast the table that is larger 
> than 8GB: 8 GB"
> -
>
> Key: SPARK-37321
> URL: https://issues.apache.org/jira/browse/SPARK-37321
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Izek Greenfield
>Priority: Major
>
> When CBO is enabled then a situation occurs where spark tries to broadcast 
> very large DataFrame due to wrong output size estimation.
>  
> In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will 
> use `DataType.defaultSize`.
> In the case where the output contains `functions.concat_ws`, the 
> `getSizePerRow` function will estimate the size to be 20 bytes, while in our 
> case the actual size can be a lot larger.
> As a result, we in some cases end up with an estimated size of < 300K while 
> the actual size can be > 8GB, thus leading to exceptions as spark thinks the 
> tables may be broadcast but later realizes the data size is too large.
>  
> Code sample to reproduce:
> {code:scala}
> import spark.implicits._
> (1 to 10).toDF("index").withColumn("index", 
> col("index").cast("string")).write.parquet("/tmp/a")
> (1 to 1000).toDF("index_b").withColumn("index_b", 
> col("index_b").cast("string")).write.parquet("/tmp/b")
> val a = spark.read
>    .parquet("/tmp/a")
>    .withColumn("b", col("index"))
>    .withColumn("l1", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l2", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l3", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l4", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l5", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
> val r = Random.alphanumeric
> val l = 220
> val i = 2800
> val b = spark.read
>    .parquet("/tmp/b")
>    .withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>  
> a.join(b, col("index") === col("index_b")).show(2000)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37321) Wrong size estimation leads to "Cannot broadcast the table that is larger than 8GB: 8 GB"

2021-11-14 Thread Izek Greenfield (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Izek Greenfield updated SPARK-37321:

Summary: Wrong size estimation leads to "Cannot broadcast the table that is 
larger than 8GB: 8 GB"  (was: Wrong size estimation that leads to "Cannot 
broadcast the table that is larger than 8GB: 8 GB")

> Wrong size estimation leads to "Cannot broadcast the table that is larger 
> than 8GB: 8 GB"
> -
>
> Key: SPARK-37321
> URL: https://issues.apache.org/jira/browse/SPARK-37321
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Izek Greenfield
>Priority: Major
>
> When CBO is enabled then a situation occurs where spark tries to broadcast 
> very large DataFrame due to wrong output size estimation.
>  
> In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will 
> use `DataType.defaultSize`.
> In the case where the output contains `functions.concat_ws`, the 
> `getSizePerRow` function will estimate the size to be 20 bytes, while in our 
> case the actual size can be a lot larger.
> As a result, we in some cases end up with an estimated size of < 300K while 
> the actual size can be > 8GB, thus leading to exceptions as spark thinks the 
> tables may be broadcast but later realizes the data size is too large.
>  
> Code sample to reproduce:
> {code:scala}
> import spark.implicits._
> (1 to 10).toDF("index").withColumn("index", 
> col("index").cast("string")).write.parquet("/tmp/a")
> (1 to 1000).toDF("index_b").withColumn("index_b", 
> col("index_b").cast("string")).write.parquet("/tmp/b")
> val a = spark.read
>    .parquet("/tmp/a")
>    .withColumn("b", col("index"))
>    .withColumn("l1", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l2", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l3", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l4", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
>    .withColumn("l5", functions.concat_ws("/", col("index"), 
> functions.current_date(), functions.current_date(), functions.current_date(), 
> functions.current_date()))
> val r = Random.alphanumeric
> val l = 220
> val i = 2800
> val b = spark.read
>    .parquet("/tmp/b")
>    .withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>    .withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a => 
> List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
>  
> a.join(b, col("index") === col("index_b")).show(2000)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org