zhengruifeng commented on PR #47742:
URL: https://github.com/apache/spark/pull/47742#issuecomment-2287690666
I test four approaches with
```
import org.apache.spark.ml.feature.StringIndexer
val numCol = 300
val data = (0 to 10000).map { i => (i, 100 * i) }
var df = data.toDF("id", "label0")
(1 to numCol).foreach { idx =>
df = df.withColumn(s"label$idx", col("label0") + 1)
}
val inputCols = (0 to numCol).map(i => s"label$i").toArray
val outputCols = (0 to numCol).map(i => s"labelIndex$i").toArray
val indexer = new
StringIndexer().setInputCols(inputCols).setOutputCols(outputCols).setStringOrderType("frequencyAsc").fit(df)
```
1, master
2, the GroupedCount expression
3, regular aggregate I (similar to
https://github.com/apache/spark/pull/47742#discussion_r1715912924)
```
val selectedCols = getSelectedCols(dataset,
inputCols.toImmutableArraySeq)
val results = Array.fill(selectedCols.size)(new OpenHashMap[String,
Long]())
dataset.select(posexplode(array(selectedCols: _*)).as(Seq("index",
"value")))
.where(col("value").isNotNull)
.groupBy("index", "value")
.agg(count(lit(1)).as("count"))
.collect()
.foreach { row =>
val index = row.getInt(0)
val value = row.getString(1)
val count = row.getLong(2)
results(index).update(value, count)
}
results
```
4, regular aggregate II
```
val selectedCols = getSelectedCols(dataset,
inputCols.toImmutableArraySeq)
val results = Array.fill(selectedCols.size)(new OpenHashMap[String,
Long]())
dataset.select(posexplode(array(selectedCols: _*)).as(Seq("index",
"value")))
.where(col("value").isNotNull)
.groupBy("index", "value")
.agg(count(lit(1)).as("count"))
.groupBy("index")
.agg(collect_list(struct("value", "count")))
.collect()
.foreach { row =>
val index = row.getInt(0)
val map = results(index)
row.getSeq[Row](1).foreach {
case Row(value: String, count: Long) => map.update(value, count)
}
}
results
```
The duration is almost the same ~31 seconds.
But the shuffle size varies:
1, 22.0 MiB
2, 14.9 MiB
3, 26.0 MiB
4, 26.0 MiB + 28.7 MiB (it has two stages)
I cannot compare the memory usage, since 1 and 2 are using
`ObjectHashAggregate` which seems not monitor the memory

So shall we still apply the `GroupedCount` expression, because:
1, it is just a replacement with existing `StringIndexerAggregator`
2, its shuffle size is the smallest, comparing to other approaches.
@hvanhovell @HyukjinKwon @WeichenXu123
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]