WeichenXu123 commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r506018249
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
##########
@@ -199,14 +193,11 @@ class LinearSVC @Since("2.2.0") (
instr.logNamedValue("lowestLabelWeight",
labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight",
labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
- if ($(blockSize) > 1) {
- val scale = 1.0 / summarizer.count / numFeatures
- val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
- instr.logNamedValue("sparsity", sparsity.toString)
- if (sparsity > 0.5) {
- instr.logWarning(s"sparsity of input dataset is $sparsity, " +
- s"which may hurt performance in high-level BLAS.")
- }
+ if (actualBlockSizeInMB == 0) {
+ val avgNNZ = summarizer.numNonzeros.activeIterator.map(_._2 /
summarizer.count).sum
Review comment:
will the additional summarizer consume time ?
##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,62 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
+
+ def blokifyWithMaxMemUsage(
+ iterator: Iterator[Instance],
+ maxMemUsage: Long): Iterator[InstanceBlock] = {
+ require(maxMemUsage > 0)
+
+ new Iterator[InstanceBlock] {
+ private var numCols = -1L
+ private val buff = mutable.ArrayBuilder.make[Instance]
+
+ override def hasNext: Boolean = iterator.hasNext
+
+ override def next(): InstanceBlock = {
+ buff.clear()
+ var buffCnt = 0L
+ var buffNnz = 0L
+ var buffUnitWeight = true
+ var blockMemUsage = 0L
+
+ while (iterator.hasNext && blockMemUsage < maxMemUsage) {
+ val instance = iterator.next()
+ if (numCols < 0L) numCols = instance.features.size
+ require(numCols == instance.features.size)
+ val nnz = instance.features.numNonzeros
+
+ buff += instance
+ buffCnt += 1L
+ buffNnz += nnz
+ buffUnitWeight &&= (instance.weight == 1)
+ blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz,
buffUnitWeight)
+ }
+
+ // the block mem usage may slightly exceed threshold, not a big issue.
+ // and this ensure even if one row exceed block limit, each block has
one row
+ InstanceBlock.fromInstances(buff.result())
+ }
+ }
+ }
+
+ def blokifyWithMaxMemUsage(
+ instances: RDD[Instance],
+ maxMemUsage: Long): RDD[InstanceBlock] = {
+ require(maxMemUsage > 0)
+ instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
+ }
+
+ def inferBlockSizeInMB(
+ dim: Int,
+ avgNNZ: Double,
+ blasLevel: Int = 2): Double = {
+ if (dim <= avgNNZ * 3) {
+ 0.25
+ } else {
+ 64.0
+ }
Review comment:
Document why choose the value ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]