mengxr commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r517854961



##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
##########
@@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
   /** @group expertGetParam */
   final def getBlockSize: Int = $(blockSize)
 }
+
+/**
+ * Trait for shared param blockSizeInMB (default: 0.0). This trait may be 
changed or
+ * removed between minor versions.
+ */
+trait HasBlockSizeInMB extends Params {
+
+  /**
+   * Param for Maximum memory in MB for stacking input data in blocks. Data is 
stacked within partitions. If more than remaining data size in a partition then 
it is adjusted to the data size. If 0, try to infer an appropriate value based 
on the statistics of dataset. Must be >= 0..
+   * @group expertParam
+   */
+  final val blockSizeInMB: DoubleParam = new DoubleParam(this, 
"blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data 
is stacked within partitions. If more than remaining data size in a partition 
then it is adjusted to the data size. If 0, try to infer an appropriate value 
based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0))

Review comment:
       Shall we call it `maxBlockSizeMB`? The current name suggests that we try 
to match the block size. Calling it `maxBlockSizeMB` would leave us some space 
for optimization.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+      iterator: Iterator[Instance],
+      maxMemUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemUsage > 0)
+
+    var numCols = -1L
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var buffCnt = 0L
+    var buffNnz = 0L
+    var buffUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val nnz = instance.features.numNonzeros
+      buff += instance
+      buffCnt += 1L
+      buffNnz += nnz
+      buffUnitWeight &&= (instance.weight == 1)
+
+      if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        buff.clear()
+        buffCnt = 0L
+        buffNnz = 0L
+        buffUnitWeight = true
+        Iterator.single(block)
+      } else Iterator.empty
+    } ++ {
+      if (buffCnt > 0) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        Iterator.single(block)
+      } else Iterator.empty
+    }
+  }
+
+  def blokifyWithMaxMemUsage(
+      instances: RDD[Instance],
+      maxMemUsage: Long): RDD[InstanceBlock] = {
+    require(maxMemUsage > 0)
+    instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.
+   *
+   * @param dim size of vector.
+   * @param avgNNZ average nnz of vectors.
+   * @param blasLevel level of BLAS operation.
+   */
+  def inferBlockSizeInMB(
+      dim: Int,
+      avgNNZ: Double,
+      blasLevel: Int = 2): Double = {

Review comment:
       Not used.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+      iterator: Iterator[Instance],
+      maxMemUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemUsage > 0)
+
+    var numCols = -1L
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var buffCnt = 0L
+    var buffNnz = 0L
+    var buffUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val nnz = instance.features.numNonzeros
+      buff += instance
+      buffCnt += 1L
+      buffNnz += nnz
+      buffUnitWeight &&= (instance.weight == 1)
+
+      if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        buff.clear()
+        buffCnt = 0L
+        buffNnz = 0L
+        buffUnitWeight = true
+        Iterator.single(block)
+      } else Iterator.empty
+    } ++ {
+      if (buffCnt > 0) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        Iterator.single(block)
+      } else Iterator.empty
+    }
+  }
+
+  def blokifyWithMaxMemUsage(
+      instances: RDD[Instance],
+      maxMemUsage: Long): RDD[InstanceBlock] = {
+    require(maxMemUsage > 0)
+    instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.
+   *
+   * @param dim size of vector.
+   * @param avgNNZ average nnz of vectors.
+   * @param blasLevel level of BLAS operation.
+   */
+  def inferBlockSizeInMB(
+      dim: Int,
+      avgNNZ: Double,
+      blasLevel: Int = 2): Double = {
+    if (dim <= avgNNZ * 3) {
+      // When the dataset is relatively dense, Spark will use netlib-java for 
optimised numerical
+      // processing, which will try to use nativeBLAS implementations (like 
OpenBLAS, Intel MKL),
+      // and fallback to the Java implementation (f2jBLAS) if necessary.
+      // The suggested value for dense cases is 0.25.
+      0.25
+    } else {
+      // When the dataset is sparse, Spark will use its own Scala 
implementation.
+      // The suggested value for sparse cases is 64.0.
+      64.0

Review comment:
       A little surprise to see the default suddenly jumps from 0.25MB to 64MB. 
This is very risky because 64MB sparse data could generate much bigger dense 
result, e.g., in multi-class logistic regression or k-means, if we eventually 
blockify their implementation. In your benchmark, it seems we start observing 
speed-up at 1MB. I will be very conservative here.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+      iterator: Iterator[Instance],
+      maxMemUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemUsage > 0)
+
+    var numCols = -1L
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var buffCnt = 0L
+    var buffNnz = 0L
+    var buffUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val nnz = instance.features.numNonzeros
+      buff += instance
+      buffCnt += 1L
+      buffNnz += nnz
+      buffUnitWeight &&= (instance.weight == 1)
+
+      if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        buff.clear()
+        buffCnt = 0L
+        buffNnz = 0L
+        buffUnitWeight = true
+        Iterator.single(block)
+      } else Iterator.empty
+    } ++ {
+      if (buffCnt > 0) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        Iterator.single(block)
+      } else Iterator.empty
+    }
+  }
+
+  def blokifyWithMaxMemUsage(
+      instances: RDD[Instance],
+      maxMemUsage: Long): RDD[InstanceBlock] = {
+    require(maxMemUsage > 0)
+    instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.

Review comment:
       Could you link to the JIRA or this PR that has the performance tests?

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemUsage(
+      iterator: Iterator[Instance],
+      maxMemUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemUsage > 0)
+
+    var numCols = -1L
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var buffCnt = 0L
+    var buffNnz = 0L
+    var buffUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val nnz = instance.features.numNonzeros
+      buff += instance
+      buffCnt += 1L
+      buffNnz += nnz
+      buffUnitWeight &&= (instance.weight == 1)
+
+      if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= 
maxMemUsage) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        buff.clear()
+        buffCnt = 0L
+        buffNnz = 0L
+        buffUnitWeight = true
+        Iterator.single(block)
+      } else Iterator.empty
+    } ++ {
+      if (buffCnt > 0) {
+        val block = InstanceBlock.fromInstances(buff.result())
+        Iterator.single(block)
+      } else Iterator.empty
+    }
+  }
+
+  def blokifyWithMaxMemUsage(
+      instances: RDD[Instance],
+      maxMemUsage: Long): RDD[InstanceBlock] = {
+    require(maxMemUsage > 0)
+    instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+  }
+
+
+  /**
+   * Suggested value for BlockSizeInMB, based on performance tests of BLAS 
operation.
+   *
+   * @param dim size of vector.
+   * @param avgNNZ average nnz of vectors.
+   * @param blasLevel level of BLAS operation.
+   */
+  def inferBlockSizeInMB(
+      dim: Int,
+      avgNNZ: Double,
+      blasLevel: Int = 2): Double = {
+    if (dim <= avgNNZ * 3) {
+      // When the dataset is relatively dense, Spark will use netlib-java for 
optimised numerical
+      // processing, which will try to use nativeBLAS implementations (like 
OpenBLAS, Intel MKL),
+      // and fallback to the Java implementation (f2jBLAS) if necessary.
+      // The suggested value for dense cases is 0.25.
+      0.25

Review comment:
       We need more comments to explain how this default value is picked. So 
this is roughly 180x180 double, which seems quite small to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to