[ 
https://issues.apache.org/jira/browse/SPARK-44768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753035#comment-17753035
 ] 

Franck Tago commented on SPARK-44768:
-------------------------------------

!image-2023-08-10-20-32-55-684.png!

> Improve WSCG handling of row buffer by accounting for executor memory  .  
> Exploding nested arrays can easily lead to out of memory errors. 
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-44768
>                 URL: https://issues.apache.org/jira/browse/SPARK-44768
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.3.2, 3.4.0, 3.4.1
>            Reporter: Franck Tago
>            Priority: Major
>         Attachments: image-2023-08-10-20-32-55-684.png, 
> spark-jira_wscg_code.txt
>
>
> consider a scenario where you flatten  a nested array 
> // e.g you can use the following steps to create the dataframe 
> //create a partClass case class
> case class partClass (PARTNAME: String , PartNumber: String , PartPrice : 
> Double )
> //create a nested array array class
> case  class array_array_class (
>  col_int: Int,
>  arr_arr_string : Seq[Seq[String]],
>  arr_arr_bigint : Seq[Seq[Long]],
>  col_string     : String,
>  parts_s        : Seq[Seq[partClass]]
>  
> )
> //create a dataframe
> var df_array_array = sc.parallelize(
>  Seq(
>  (1,Seq(Seq("a","b" ,"c" ,"d") ,Seq("aa","bb" ,"cc","dd")) , 
> Seq(Seq(1000,20000), Seq(30000,-10000)),"ItemPart1",
>   Seq(Seq(partClass("PNAME1","P1",20.75),partClass("PNAME1_1","P1_1",30.75)))
>  ) ,
>  
>  (2,Seq(Seq("ab","bc" ,"cd" ,"de") ,Seq("aab","bbc" 
> ,"ccd","dde"),Seq("aaaaaabbbbb")) , Seq(Seq(-1000,-20000,-1,-2), 
> Seq(0,30000,-10000)),"ItemPart2",
>   
> Seq(Seq(partClass("PNAME2","P2",170.75),partClass("PNAME2_1","P2_1",33.75),partClass("PNAME2_2","P2_2",73.75)))
>  )
>   
>  )
> ).toDF("c1" ,"c2" ,"c3" ,"c4" ,"c5")
> //explode a nested array 
> var  result   =  df_array_array.select( col("c1"), 
> explode(col("c2"))).select('c1 , explode('col))
> result.explain
>  
> The physical for this operator is seen below.
> -------------------------------------
> Physical plan 
> == Physical Plan ==
> *(1) Generate explode(col#27), [c1#17|#17], false, [col#30|#30]
> +- *(1) Filter ((size(col#27, true) > 0) AND isnotnull(col#27))
>    +- *(1) Generate explode(c2#18), [c1#17|#17], false, [col#27|#27]
>       +- *(1) Project [_1#6 AS c1#17, _2#7 AS c2#18|#6 AS c1#17, _2#7 AS 
> c2#18]
>          +- *(1) Filter ((size(_2#7, true) > 0) AND isnotnull(_2#7))
>             +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, 
> scala.Tuple5, true]))._1 AS _1#6, mapobjects(lambdavariable(MapObject, 
> ObjectType(class java.lang.Object), true, -1), 
> mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), 
> true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
> StringType, fromString, validateexternaltype(lambdavariable(MapObject, 
> ObjectType(class java.lang.Object), true, -2), StringType, ObjectType(class 
> java.lang.String)), true, false, true), 
> validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -1), ArrayType(StringType,true), 
> ObjectType(interface scala.collection.Seq)), None), 
> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2, None) AS _2#7, 
> mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), 
> true, -3), mapobjects(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -4), 
> assertnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -4), IntegerType, IntegerType)), 
> validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -3), ArrayType(IntegerType,false), 
> ObjectType(interface scala.collection.Seq)), None), 
> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3, None) AS _3#8, 
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4, 
> true, false, true) AS _4#9, mapobjects(lambdavariable(MapObject, 
> ObjectType(class java.lang.Object), true, -5), 
> mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), 
> true, -6), if (isnull(validateexternaltype(lambdavariable(MapObject, 
> ObjectType(class java.lang.Object), true, -6), 
> StructField(PARTNAME,StringType,true), 
> StructField(PartNumber,StringType,true), 
> StructField(PartPrice,DoubleType,false), ObjectType(class 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass)))) null else 
> named_struct(PARTNAME, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), 
> StructField(PartNumber,StringType,true), 
> StructField(PartPrice,DoubleType,false), ObjectType(class 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PARTNAME, true, 
> false, true), PartNumber, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), 
> StructField(PartNumber,StringType,true), 
> StructField(PartPrice,DoubleType,false), ObjectType(class 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartNumber, true, 
> false, true), PartPrice, 
> knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), 
> StructField(PartNumber,StringType,true), 
> StructField(PartPrice,DoubleType,false), ObjectType(class 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartPrice), 
> validateexternaltype(lambdavariable(MapObject, ObjectType(class 
> java.lang.Object), true, -5), 
> ArrayType(StructType(StructField(PARTNAME,StringType,true),StructField(PartNumber,StringType,true),StructField(PartPrice,DoubleType,false)),true),
>  ObjectType(interface scala.collection.Seq)), None), 
> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5, None) AS _5#10]
>                +- Scan[obj#5|#5]
>  
>  
> Because the explode function can create multiple rows from a single row  , we 
> should account for the memory available when adding rows to the buffer .  
>  
> This is even more important when we are exploding nested arrays . 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to