zhengruifeng commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r506167739
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,62 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ new Iterator[InstanceBlock] {
+ private var numCols = -1L
+ private val buff = mutable.ArrayBuilder.make[Instance]
+
+ override def hasNext: Boolean = iterator.hasNext
+
+ override def next(): InstanceBlock = {
+ buff.clear()
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+ var blockMemUsage = 0L
+
+ while (iterator.hasNext && blockMemUsage < maxMemUsage) {
+ val instance = iterator.next()
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+ blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz,
buffUnitWeight)
+ }
+
+ // the block mem usage may slightly exceed threshold, not a big issue.
+ // and this ensure even if one row exceed block limit, each block has
one row
+ InstanceBlock.fromInstances(buff.result())
+ }
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+ def inferBlockSizeInMB(
+ dim: Int,
+ avgNNZ: Double,
+ blasLevel: Int = 2): Double = {
+ if (dim <= avgNNZ * 3) {
+ 0.25
+ } else {
+ 64.0
+ }
Review comment:
Current strategy is quitely simple, I think we may use a complex
costmodel if necessay in the future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]