zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570745838



##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, 
lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == 
StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = 
models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: 
Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       `val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to