zhengruifeng commented on PR #47742:
URL: https://github.com/apache/spark/pull/47742#issuecomment-2287690666

   I test four approaches with 
   ```
   import org.apache.spark.ml.feature.StringIndexer
   
   val numCol = 300
   val data = (0 to 10000).map { i => (i, 100 * i) }
   var df = data.toDF("id", "label0")
   (1 to numCol).foreach { idx =>
     df = df.withColumn(s"label$idx", col("label0") + 1)
   }
   val inputCols = (0 to numCol).map(i => s"label$i").toArray
   val outputCols = (0 to numCol).map(i => s"labelIndex$i").toArray
   val indexer = new 
StringIndexer().setInputCols(inputCols).setOutputCols(outputCols).setStringOrderType("frequencyAsc").fit(df)
   ```
   
   1, master
   2, the GroupedCount expression
   3, regular aggregate I (similar to 
https://github.com/apache/spark/pull/47742#discussion_r1715912924)
   ```
       val selectedCols = getSelectedCols(dataset, 
inputCols.toImmutableArraySeq)
       val results = Array.fill(selectedCols.size)(new OpenHashMap[String, 
Long]())
       dataset.select(posexplode(array(selectedCols: _*)).as(Seq("index", 
"value")))
         .where(col("value").isNotNull)
         .groupBy("index", "value")
         .agg(count(lit(1)).as("count"))
         .collect()
         .foreach { row =>
           val index = row.getInt(0)
           val value = row.getString(1)
           val count = row.getLong(2)
           results(index).update(value, count)
         }
       results
   ```
   4, regular aggregate II
   ```
       val selectedCols = getSelectedCols(dataset, 
inputCols.toImmutableArraySeq)
       val results = Array.fill(selectedCols.size)(new OpenHashMap[String, 
Long]())
       dataset.select(posexplode(array(selectedCols: _*)).as(Seq("index", 
"value")))
         .where(col("value").isNotNull)
         .groupBy("index", "value")
         .agg(count(lit(1)).as("count"))
         .groupBy("index")
         .agg(collect_list(struct("value", "count")))
         .collect()
         .foreach { row =>
           val index = row.getInt(0)
           val map = results(index)
           row.getSeq[Row](1).foreach {
             case Row(value: String, count: Long) => map.update(value, count)
           }
         }
       results
   ```
   
   The duration is almost the same ~31 seconds.
   But the shuffle size varies:
   1, 22.0 MiB
   2, 14.9 MiB
   3, 26.0 MiB
   4, 26.0 MiB + 28.7 MiB (it has two stages)
   
   I cannot compare the memory usage, since 1 and 2 are using 
`ObjectHashAggregate` which seems not monitor the memory
   
![image](https://github.com/user-attachments/assets/195781a4-6b2b-4ff8-b432-1d2d9ed42932)
   
   
   So shall we still apply the `GroupedCount` expression, because:
   1, it is just a replacement with existing `StringIndexerAggregator`
   2, its shuffle size is the smallest, comparing to other approaches.
   
   @hvanhovell @HyukjinKwon @WeichenXu123 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to