mengxr commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r517854961
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
##########
@@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
/** @group expertGetParam */
final def getBlockSize: Int = $(blockSize)
}
+
+/**
+ * Trait for shared param blockSizeInMB (default: 0.0). This trait may be
changed or
+ * removed between minor versions.
+ */
+trait HasBlockSizeInMB extends Params {
+
+ /**
+ * Param for Maximum memory in MB for stacking input data in blocks. Data is
stacked within partitions. If more than remaining data size in a partition then
it is adjusted to the data size. If 0, try to infer an appropriate value based
on the statistics of dataset. Must be >= 0..
+ * @group expertParam
+ */
+ final val blockSizeInMB: DoubleParam = new DoubleParam(this,
"blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data
is stacked within partitions. If more than remaining data size in a partition
then it is adjusted to the data size. If 0, try to infer an appropriate value
based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0))
Review comment:
Shall we call it `maxBlockSizeMB`? The current name suggests that we try
to match the block size. Calling it `maxBlockSizeMB` would leave us some space
for optimization.
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ var numCols = -1L
+ val buff = mutable.ArrayBuilder.make[Instance]
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+
+ iterator.flatMap { instance =>
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+
+ if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >=
maxMemUsage) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ buff.clear()
+ buffCnt = 0L
+ buffNnz = 0L
+ buffUnitWeight = true
+ Iterator.single(block)
+ } else Iterator.empty
+ } ++ {
+ if (buffCnt > 0) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ Iterator.single(block)
+ } else Iterator.empty
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+
+ /**
+ * Suggested value for BlockSizeInMB, based on performance tests of BLAS
operation.
+ *
+ * @param dim size of vector.
+ * @param avgNNZ average nnz of vectors.
+ * @param blasLevel level of BLAS operation.
+ */
+ def inferBlockSizeInMB(
+ dim: Int,
+ avgNNZ: Double,
+ blasLevel: Int = 2): Double = {
Review comment:
Not used.
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ var numCols = -1L
+ val buff = mutable.ArrayBuilder.make[Instance]
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+
+ iterator.flatMap { instance =>
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+
+ if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >=
maxMemUsage) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ buff.clear()
+ buffCnt = 0L
+ buffNnz = 0L
+ buffUnitWeight = true
+ Iterator.single(block)
+ } else Iterator.empty
+ } ++ {
+ if (buffCnt > 0) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ Iterator.single(block)
+ } else Iterator.empty
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+
+ /**
+ * Suggested value for BlockSizeInMB, based on performance tests of BLAS
operation.
+ *
+ * @param dim size of vector.
+ * @param avgNNZ average nnz of vectors.
+ * @param blasLevel level of BLAS operation.
+ */
+ def inferBlockSizeInMB(
+ dim: Int,
+ avgNNZ: Double,
+ blasLevel: Int = 2): Double = {
+ if (dim <= avgNNZ * 3) {
+ // When the dataset is relatively dense, Spark will use netlib-java for
optimised numerical
+ // processing, which will try to use nativeBLAS implementations (like
OpenBLAS, Intel MKL),
+ // and fallback to the Java implementation (f2jBLAS) if necessary.
+ // The suggested value for dense cases is 0.25.
+ 0.25
+ } else {
+ // When the dataset is sparse, Spark will use its own Scala
implementation.
+ // The suggested value for sparse cases is 64.0.
+ 64.0
Review comment:
A little surprise to see the default suddenly jumps from 0.25MB to 64MB.
This is very risky because 64MB sparse data could generate much bigger dense
result, e.g., in multi-class logistic regression or k-means, if we eventually
blockify their implementation. In your benchmark, it seems we start observing
speed-up at 1MB. I will be very conservative here.
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ var numCols = -1L
+ val buff = mutable.ArrayBuilder.make[Instance]
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+
+ iterator.flatMap { instance =>
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+
+ if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >=
maxMemUsage) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ buff.clear()
+ buffCnt = 0L
+ buffNnz = 0L
+ buffUnitWeight = true
+ Iterator.single(block)
+ } else Iterator.empty
+ } ++ {
+ if (buffCnt > 0) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ Iterator.single(block)
+ } else Iterator.empty
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+
+ /**
+ * Suggested value for BlockSizeInMB, based on performance tests of BLAS
operation.
Review comment:
Could you link to the JIRA or this PR that has the performance tests?
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,74 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ var numCols = -1L
+ val buff = mutable.ArrayBuilder.make[Instance]
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+
+ iterator.flatMap { instance =>
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+
+ if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >=
maxMemUsage) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ buff.clear()
+ buffCnt = 0L
+ buffNnz = 0L
+ buffUnitWeight = true
+ Iterator.single(block)
+ } else Iterator.empty
+ } ++ {
+ if (buffCnt > 0) {
+ val block = InstanceBlock.fromInstances(buff.result())
+ Iterator.single(block)
+ } else Iterator.empty
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+
+ /**
+ * Suggested value for BlockSizeInMB, based on performance tests of BLAS
operation.
+ *
+ * @param dim size of vector.
+ * @param avgNNZ average nnz of vectors.
+ * @param blasLevel level of BLAS operation.
+ */
+ def inferBlockSizeInMB(
+ dim: Int,
+ avgNNZ: Double,
+ blasLevel: Int = 2): Double = {
+ if (dim <= avgNNZ * 3) {
+ // When the dataset is relatively dense, Spark will use netlib-java for
optimised numerical
+ // processing, which will try to use nativeBLAS implementations (like
OpenBLAS, Intel MKL),
+ // and fallback to the Java implementation (f2jBLAS) if necessary.
+ // The suggested value for dense cases is 0.25.
+ 0.25
Review comment:
We need more comments to explain how this default value is picked. So
this is roughly 180x180 double, which seems quite small to me.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]