srowen commented on a change in pull request #26803: [SPARK-30178][ML] 
RobustScaler support large numFeatures
URL: https://github.com/apache/spark/pull/26803#discussion_r356579189
 
 

 ##########
 File path: mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
 ##########
 @@ -147,49 +146,44 @@ class RobustScaler (override val uid: String)
 
   override def fit(dataset: Dataset[_]): RobustScalerModel = {
     transformSchema(dataset.schema, logging = true)
-    val localRelativeError = $(relativeError)
 
-    val summaries = dataset.select($(inputCol)).rdd.map {
-      case Row(vec: Vector) => vec
-    }.mapPartitions { iter =>
-      var agg: Array[QuantileSummaries] = null
-      while (iter.hasNext) {
-        val vec = iter.next()
-        if (agg == null) {
-          agg = Array.fill(vec.size)(
-            new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
localRelativeError))
-        }
-        require(vec.size == agg.length,
-          s"Number of dimensions must be ${agg.length} but got ${vec.size}")
-        var i = 0
-        while (i < vec.size) {
-          agg(i) = agg(i).insert(vec(i))
-          i += 1
-        }
-      }
+    val vectors = dataset.select($(inputCol)).rdd.map { case Row(vec: Vector) 
=> vec }
+    val numFeatures = MetadataUtils.getNumFeatures(dataset.schema($(inputCol)))
+      .getOrElse(vectors.first().size)
 
-      if (agg == null) {
-        Iterator.empty
-      } else {
-        Iterator.single(agg.map(_.compress))
-      }
-    }.treeReduce { (agg1, agg2) =>
-      require(agg1.length == agg2.length)
-      var i = 0
-      while (i < agg1.length) {
-        agg1(i) = agg1(i).merge(agg2(i))
-        i += 1
+    val localRelativeError = $(relativeError)
+    val localUpper = $(upper)
+    val localLower = $(lower)
+
+    // compute scale by the logic in treeAggregate with depth=2
+    // TODO: design a common treeAggregateByKey in PairRDDFunctions?
+    val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 
2)
 
 Review comment:
   Thinking this through more: say there are M partitions of data, and N 
dimensions / features. This produces N*sqrt(M) new partitions. 
   
   N*sqrt(M) is better than than N before, for the case I mentioned, where N is 
small and M is large. The number of partitions doesn't drop so much that you 
might lose too much parallelism for large data. You're still losing some 
parallelism compared to processing with M partitions, in the case where sqrt(M) 
> N.
   
   N*sqrt(M) is on the other hand a lot of partitions, if N is large and M is 
small, which is your case. What if N = 10000? You might go from ten partitions 
to a thousand. Or a million if M is large as well as N.
   
   I'm trying to figure out if we can mostly get rid of the factor of N.
   
   What if you keyed by (i, partition / N)? You end up with about N * M/N = M 
partitions in the aggregation, which is nice. Well, that's roughly true if N < 
M. Of course, if N > M then this still gives you N partitions out, but that's 
better than N*sqrt(M) for your case I believe. WDYT?
   
   I think this still bears a quick benchmark on the 'common case' of few 
features and a nontrivial amount of data / partitions. I am slightly concerned 
this might be much slower for that case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to