Izek Greenfield created SPARK-37321: ---------------------------------------
Summary: Wrong size estimation that leads to Cannot broadcast the table that is larger than 8GB: 8 GB Key: SPARK-37321 URL: https://issues.apache.org/jira/browse/SPARK-37321 Project: Spark Issue Type: Bug Components: Optimizer Affects Versions: 3.2.0, 3.1.1 Reporter: Izek Greenfield When CBO is enabled then a situation occurs where spark tries to broadcast very large DataFrame due to wrong output size estimation. In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will use `DataType.defaultSize`. In the case where the output contains `functions.concat_ws`, the `getSizePerRow` function will estimate the size to be 20 bytes, while in our case the actual size can be a lot larger. As a result, we in some cases end up with an estimated size of < 300K while the actual size can be > 8GB, thus leading to exceptions as spark thinks the tables may be broadcast but later realizes the data size is too large. Code sample to reproduce: {code:scala} import spark.implicits._ (1 to 100000).toDF("index").withColumn("index", col("index").cast("string")).write.parquet("/tmp/a") (1 to 1000).toDF("index_b").withColumn("index_b", col("index_b").cast("string")).write.parquet("/tmp/b") val a = spark.read .parquet("/tmp/a") .withColumn("b", col("index")) .withColumn("l1", functions.concat_ws("/", col("index"), functions.current_date(), functions.current_date(), functions.current_date(), functions.current_date())) .withColumn("l2", functions.concat_ws("/", col("index"), functions.current_date(), functions.current_date(), functions.current_date(), functions.current_date())) .withColumn("l3", functions.concat_ws("/", col("index"), functions.current_date(), functions.current_date(), functions.current_date(), functions.current_date())) .withColumn("l4", functions.concat_ws("/", col("index"), functions.current_date(), functions.current_date(), functions.current_date(), functions.current_date())) .withColumn("l5", functions.concat_ws("/", col("index"), functions.current_date(), functions.current_date(), functions.current_date(), functions.current_date())) val r = Random.alphanumeric val l = 220 val i = 2800 val b = spark.read .parquet("/tmp/b") .withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) .withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a => List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) a.join(b, col("index") === col("index_b")).show(2000) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org