Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19746#discussion_r156279322
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
+import org.apache.spark.ml.param.{IntParam, Param, ParamMap,
ParamValidators}
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable,
DefaultParamsWritable, Identifiable}
+import org.apache.spark.sql.{Column, DataFrame, Dataset}
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A feature transformer that adds vector size information to a vector
column.
+ */
+@Experimental
+@Since("2.3.0")
+class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid:
String)
+ extends Transformer with HasInputCol with HasHandleInvalid with
DefaultParamsWritable {
+
+ @Since("2.3.0")
+ def this() = this(Identifiable.randomUID("vectSizeHint"))
+
+ @Since("2.3.0")
+ val size = new IntParam(this, "size", "Size of vectors in column.", {s:
Int => s >= 0})
+
+ @Since("2.3.0")
+ def getSize: Int = getOrDefault(size)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setSize(value: Int): this.type = set(size, value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setInputCol(value: String): this.type = set(inputCol, value)
+
+ @Since("2.3.0")
+ override val handleInvalid: Param[String] = new Param[String](
+ this,
+ "handleInvalid",
+ "How to handle invalid vectors in inputCol, (invalid vectors include
nulls and vectors with " +
+ "the wrong size. The options are `skip` (filter out rows with
invalid vectors), `error` " +
+ "(throw an error) and `optimistic` (don't check the vector size).",
+ ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setHandleInvalid(value: String): this.type = set(handleInvalid,
value)
+ setDefault(handleInvalid, VectorSizeHint.ERROR_INVALID)
+
+ @Since("2.3.0")
+ override def transform(dataset: Dataset[_]): DataFrame = {
+ val localInputCol = getInputCol
+ val localSize = getSize
+ val localHandleInvalid = getHandleInvalid
+
+ val group =
AttributeGroup.fromStructField(dataset.schema(localInputCol))
+ if (localHandleInvalid == VectorSizeHint.OPTIMISTIC_INVALID &&
group.size == localSize) {
+ dataset.toDF
+ } else {
+ val newGroup = group.size match {
+ case `localSize` => group
+ case -1 => new AttributeGroup(localInputCol, localSize)
+ case _ =>
+ val msg = s"Trying to set size of vectors in `$localInputCol` to
$localSize but size " +
+ s"already set to ${group.size}."
+ throw new SparkException(msg)
+ }
+
+ val newCol: Column = localHandleInvalid match {
+ case VectorSizeHint.OPTIMISTIC_INVALID => col(localInputCol)
+ case VectorSizeHint.ERROR_INVALID =>
+ val checkVectorSizeUDF = udf { vector: Vector =>
+ if (vector == null) {
+ throw new SparkException(s"Got null vector in
VectorSizeHint, set `handleInvalid` " +
+ s"to 'skip' to filter invalid rows.")
+ }
+ if (vector.size != localSize) {
+ throw new SparkException(s"VectorSizeHint Expecting a vector
of size $localSize but" +
+ s" got ${vector.size}")
+ }
+ vector
+ }.asNondeterministic
+ checkVectorSizeUDF(col(localInputCol))
+ case VectorSizeHint.SKIP_INVALID =>
+ val checkVectorSizeUDF = udf { vector: Vector =>
--- End diff --
I feel this is a bug, let me look into it
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]