[3/4] spark git commit: [SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms

2016-05-17 Thread meng
http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 626e97e..9d084b5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml._
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => 
OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -93,7 +96,9 @@ class StandardScaler(override val uid: String) extends 
Estimator[StandardScalerM
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StandardScalerModel = {
 transformSchema(dataset.schema, logging = true)
-val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v 
}
+val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+  case Row(v: Vector) => OldVectors.fromML(v)
+}
 val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = 
$(withStd))
 val scalerModel = scaler.fit(input)
 copyValues(new StandardScalerModel(uid, scalerModel.std, 
scalerModel.mean).setParent(this))
@@ -145,7 +150,11 @@ class StandardScalerModel private[ml] (
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 val scaler = new feature.StandardScalerModel(std, mean, $(withStd), 
$(withMean))
-val scale = udf { scaler.transform _ }
+
+// TODO: Make the transformer natively in ml framework to avoid extra 
conversion.
+val transformer: Vector => Vector = v => 
scaler.transform(OldVectors.fromML(v)).asML
+
+val scale = udf(transformer)
 dataset.withColumn($(outputCol), scale(col($(inputCol
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 4d3e46e..1bc2420 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -23,10 +23,10 @@ import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NumericAttribute, UnresolvedAttribute}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 68b699d..2bc9d22 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -27,10 +27,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
VectorUDT}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
VectorUDT}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{StructField, StructType}


[3/4] spark git commit: [SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms

2016-05-17 Thread meng
http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 626e97e..9d084b5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml._
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => 
OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -93,7 +96,9 @@ class StandardScaler(override val uid: String) extends 
Estimator[StandardScalerM
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StandardScalerModel = {
 transformSchema(dataset.schema, logging = true)
-val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v 
}
+val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+  case Row(v: Vector) => OldVectors.fromML(v)
+}
 val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = 
$(withStd))
 val scalerModel = scaler.fit(input)
 copyValues(new StandardScalerModel(uid, scalerModel.std, 
scalerModel.mean).setParent(this))
@@ -145,7 +150,11 @@ class StandardScalerModel private[ml] (
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 val scaler = new feature.StandardScalerModel(std, mean, $(withStd), 
$(withMean))
-val scale = udf { scaler.transform _ }
+
+// TODO: Make the transformer natively in ml framework to avoid extra 
conversion.
+val transformer: Vector => Vector = v => 
scaler.transform(OldVectors.fromML(v)).asML
+
+val scale = udf(transformer)
 dataset.withColumn($(outputCol), scale(col($(inputCol
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 4d3e46e..1bc2420 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -23,10 +23,10 @@ import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NumericAttribute, UnresolvedAttribute}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 68b699d..2bc9d22 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -27,10 +27,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
VectorUDT}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
VectorUDT}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{StructField, StructType}