WeichenXu123 commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r503848595
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,85 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ new Iterator[InstanceBlock] {
+ private var numCols = -1L
+ private val buff = mutable.ArrayBuilder.make[Instance]
+ private var buffCnt = 0L
+ private var buffNnz = 0L
+ private var buffUnitWeight = true
+ private var block = Option.empty[InstanceBlock]
+
+ private def flush(): Unit = {
+ block = Some(InstanceBlock.fromInstances(buff.result()))
+ buff.clear()
+ buffCnt = 0L
+ buffNnz = 0L
+ buffUnitWeight = true
+ }
+
+ private def blockify(): Unit = {
+ block = None
+
+ while (block.isEmpty && iterator.hasNext) {
+ val instance = iterator.next()
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+
+ // Check if enough memory remains to add this instance to the block.
+ if (getBlockMemUsage(numCols, buffCnt + 1L, buffNnz + nnz,
+ buffUnitWeight && (instance.weight == 1)) > maxMemUsage) {
+ // Check if this instance is too large
+ require(buffCnt > 0, s"instance $instance exceeds memory limit
$maxMemUsage, " +
+ s"please increase block size")
+ flush()
+ }
+
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
Review comment:
After flush, buffCnt/buffNnz clear to be 0, but then you increase one
and then exit loop. Then next batch the initial buffCnt/buffNnz won't be 0.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]