Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/5282#discussion_r27750295
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala ---
@@ -192,6 +192,128 @@ final class DataFrameNaFunctions private[sql](df:
DataFrame) {
*/
def fill(valueMap: Map[String, Any]): DataFrame = fill0(valueMap.toSeq)
+ /**
+ * Replaces values matching keys in `replacement` map with the
corresponding values.
+ * Key and value of `replacement` map must have the same type, and can
only be doubles or strings.
+ * If `col` is "*", then the replacement is applied on all string
columns or numeric columns.
+ *
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ *
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height".
+ * df.replace("height", ImmutableMap.of(1.0, 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column
"name".
+ * df.replace("name", ImmutableMap.of("UNKNOWN", "unnamed"));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in all
string columns.
+ * df.replace("*", ImmutableMap.of("UNKNOWN", "unnamed"));
+ * }}}
+ *
+ * @param col name of the column to apply the value replacement
+ * @param replacement value replacement map, as explained above
+ */
+ def replace[T](col: String, replacement: java.util.Map[T, T]): DataFrame
= {
+ replace[T](col, replacement.toMap : Map[T, T])
+ }
+
+ /**
+ * Replaces values matching keys in `replacement` map with the
corresponding values.
+ * Key and value of `replacement` map must have the same type, and can
only be doubles or strings.
+ *
+ * {{{
+ * import com.google.common.collect.ImmutableMap;
+ *
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height" and
"weight".
+ * df.replace(new String[] {"height", "weight"}, ImmutableMap.of(1.0,
2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column
"firstname" and "lastname".
+ * df.replace(new String[] {"firstname", "lastname"},
ImmutableMap.of("UNKNOWN", "unnamed"));
+ * }}}
+ *
+ * @param cols list of columns to apply the value replacement
+ * @param replacement value replacement map, as explained above
+ */
+ def replace[T](cols: Array[String], replacement: java.util.Map[T, T]):
DataFrame = {
+ replace(cols.toSeq, replacement.toMap)
+ }
+
+ /**
+ * (Scala-specific) Replaces values matching keys in `replacement` map.
+ * Key and value of `replacement` map must have the same type, and can
only be doubles or strings.
+ * If `col` is "*", then the replacement is applied on all string
columns or numeric columns.
+ *
+ * {{{
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height".
+ * df.replace("height", Map(1.0 -> 2.0))
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column
"name".
+ * df.replace("name", Map("UNKNOWN" -> "unnamed")
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in all
string columns.
+ * df.replace("*", Map("UNKNOWN" -> "unnamed")
+ * }}}
+ *
+ * @param col name of the column to apply the value replacement
+ * @param replacement value replacement map, as explained above
+ */
+ def replace[T](col: String, replacement: Map[T, T]): DataFrame = {
+ if (col == "*") {
+ replace0(df.columns, replacement)
+ } else {
+ replace0(Seq(col), replacement)
+ }
+ }
+
+ /**
+ * (Scala-specific) Replaces values matching keys in `replacement` map.
+ * Key and value of `replacement` map must have the same type, and can
only be doubles or strings.
+ *
+ * {{{
+ * // Replaces all occurrences of 1.0 with 2.0 in column "height" and
"weight".
+ * df.replace("height" :: "weight" :: Nil, Map(1.0 -> 2.0));
+ *
+ * // Replaces all occurrences of "UNKNOWN" with "unnamed" in column
"firstname" and "lastname".
+ * df.replace("firstname" :: "lastname" :: Nil, Map("UNKNOWN" ->
"unnamed");
+ * }}}
+ *
+ * @param cols list of columns to apply the value replacement
+ * @param replacement value replacement map, as explained above
+ */
+ def replace[T](cols: Seq[String], replacement: Map[T, T]): DataFrame =
replace0(cols, replacement)
+
+ private def replace0[T](cols: Seq[String], replacement: Map[T, T]):
DataFrame = {
+ if (replacement.isEmpty || cols.isEmpty) {
+ return df
+ }
+
+ // replacementMap is either Map[String, String] or Map[Double, Double]
+ val replacementMap: Map[_, _] = replacement.head._2 match {
+ case v: String => replacement
+ case _ => replacement.map { case (k, v) => (convertToDouble(k),
convertToDouble(v)) }
+ }
+
+ // targetColumnType is either DoubleType or StringType
+ val targetColumnType = replacement.head._1 match {
+ case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long =>
DoubleType
+ case _: String => StringType
+ }
+
+ val columnEquals = df.sqlContext.analyzer.resolver
+ val projections = df.schema.fields.map { f =>
+ val shouldReplace = cols.exists(colName => columnEquals(colName,
f.name))
--- End diff --
I think it would be better to use the logic from the analyzer directly so
that things like backtick escaping and star expansion work as one would expect.
I think basically you want to use the parser on the strings, construct a
project, resolve that, and then transform the project list.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]