srowen commented on a change in pull request #26803: [SPARK-30178][ML] 
RobustScaler support large numFeatures
URL: https://github.com/apache/spark/pull/26803#discussion_r357134179
 
 

 ##########
 File path: mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
 ##########
 @@ -147,49 +146,44 @@ class RobustScaler (override val uid: String)
 
   override def fit(dataset: Dataset[_]): RobustScalerModel = {
     transformSchema(dataset.schema, logging = true)
-    val localRelativeError = $(relativeError)
 
-    val summaries = dataset.select($(inputCol)).rdd.map {
-      case Row(vec: Vector) => vec
-    }.mapPartitions { iter =>
-      var agg: Array[QuantileSummaries] = null
-      while (iter.hasNext) {
-        val vec = iter.next()
-        if (agg == null) {
-          agg = Array.fill(vec.size)(
-            new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
localRelativeError))
-        }
-        require(vec.size == agg.length,
-          s"Number of dimensions must be ${agg.length} but got ${vec.size}")
-        var i = 0
-        while (i < vec.size) {
-          agg(i) = agg(i).insert(vec(i))
-          i += 1
-        }
-      }
+    val vectors = dataset.select($(inputCol)).rdd.map { case Row(vec: Vector) 
=> vec }
+    val numFeatures = MetadataUtils.getNumFeatures(dataset.schema($(inputCol)))
+      .getOrElse(vectors.first().size)
 
-      if (agg == null) {
-        Iterator.empty
-      } else {
-        Iterator.single(agg.map(_.compress))
-      }
-    }.treeReduce { (agg1, agg2) =>
-      require(agg1.length == agg2.length)
-      var i = 0
-      while (i < agg1.length) {
-        agg1(i) = agg1(i).merge(agg2(i))
-        i += 1
+    val localRelativeError = $(relativeError)
+    val localUpper = $(upper)
+    val localLower = $(lower)
+
+    // compute scale by the logic in treeAggregate with depth=2
+    // TODO: design a common treeAggregateByKey in PairRDDFunctions?
+    val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 
2)
 
 Review comment:
   Hm yeah nevermind the comment about partitions, that doesn't change. It's 
just the number of aggregation keys. I don't know whether increasing the number 
of aggregation keys is something we should avoid or not, but that's not the 
reason, OK.
   
   Anyway, I'd still be interested how it performs before and after, any rough 
benchmark, for the small N case. I'm just sort of concerned it could be slower 
for that common case because now the whole data set is chopped up finely and 
shuffled in all cases. Doesn't mean we can't do it, just wondering if there is 
a big difference to know about.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to