Repository: spark Updated Branches: refs/heads/master 054ddb2f5 -> a28728a9a
[SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR ## What changes were proposed in this pull request? In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR. ### For PySpark ``` >>> data = [(1, {"name": "Alice"})] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'{"name":"Alice")'] >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')] ``` ### For SparkR ``` # Converts a map into a JSON object df2 <- sql("SELECT map('name', 'Bob')) as people") df2 <- mutate(df2, people_json = to_json(df2$people)) # Converts an array of maps into a JSON array df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") df2 <- mutate(df2, people_json = to_json(df2$people)) ``` ## How was this patch tested? Add unit test cases. cc viirya HyukjinKwon Author: goldmedal <liugs...@gmail.com> Closes #19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a28728a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a28728a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a28728a9 Branch: refs/heads/master Commit: a28728a9afcff94194147573e07f6f4d0463687e Parents: 054ddb2 Author: goldmedal <liugs...@gmail.com> Authored: Fri Sep 15 11:53:10 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Fri Sep 15 11:53:10 2017 +0900 ---------------------------------------------------------------------- R/pkg/R/functions.R | 16 +++++++++++--- R/pkg/tests/fulltests/test_sparkSQL.R | 8 +++++++ python/pyspark/sql/functions.py | 22 ++++++++++++++------ .../catalyst/expressions/jsonExpressions.scala | 8 +++---- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../sql-tests/results/json-functions.sql.out | 8 +++---- 6 files changed, 46 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/R/functions.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 5a46d73..e92e1fd 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -176,7 +176,8 @@ NULL #' #' @param x Column to compute on. Note the difference in the following methods: #' \itemize{ -#' \item \code{to_json}: it is the column containing the struct or array of the structs. +#' \item \code{to_json}: it is the column containing the struct, array of the structs, +#' the map or array of maps. #' \item \code{from_json}: it is the column containing the JSON string. #' } #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains @@ -1700,8 +1701,9 @@ setMethod("to_date", }) #' @details -#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType} -#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered. +#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType}, +#' a \code{mapType} or array of \code{mapType} into a Column of JSON string. +#' Resolving the Column can fail if an unsupported type is encountered. #' #' @rdname column_collection_functions #' @aliases to_json to_json,Column-method @@ -1715,6 +1717,14 @@ setMethod("to_date", #' #' # Converts an array of structs into a JSON array #' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people)) +#' +#' # Converts a map into a JSON object +#' df2 <- sql("SELECT map('name', 'Bob')) as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people)) +#' +#' # Converts an array of maps into a JSON array +#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") #' df2 <- mutate(df2, people_json = to_json(df2$people))} #' @note to_json since 2.2.0 setMethod("to_json", signature(x = "Column"), http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 7abc872..85a7e08 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1491,6 +1491,14 @@ test_that("column functions", { j <- collect(select(df, alias(to_json(df$people), "json"))) expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]") + df <- sql("SELECT map('name', 'Bob') as people") + j <- collect(select(df, alias(to_json(df$people), "json"))) + expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}") + + df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") + j <- collect(select(df, alias(to_json(df$people), "json"))) + expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]") + df <- read.json(mapTypeJsonPath) j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0e76182..399bef0 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1884,9 +1884,9 @@ def json_tuple(col, *fields): @since(2.1) def from_json(col, schema, options={}): """ - Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]] - of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable - string. + Parses a column containing a JSON string into a :class:`StructType` or :class:`ArrayType` + of :class:`StructType`\\s with the specified schema. Returns `null`, in the case of an + unparseable string. :param col: string column in json format :param schema: a StructType or ArrayType of StructType to use when parsing the json column. @@ -1921,10 +1921,12 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ - Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a - JSON string. Throws an exception, in the case of an unsupported type. + Converts a column containing a :class:`StructType`, :class:`ArrayType` of + :class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s + into a JSON string. Throws an exception, in the case of an unsupported type. - :param col: name of column containing the struct or array of the structs + :param col: name of column containing the struct, array of the structs, the map or + array of the maps. :param options: options to control converting. accepts the same options as the json datasource >>> from pyspark.sql import Row @@ -1937,6 +1939,14 @@ def to_json(col, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')] + >>> data = [(1, {"name": "Alice"})] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'{"name":"Alice"}')] + >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')] """ sc = SparkContext._active_spark_context http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 1341631..18b4fed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -618,13 +618,13 @@ case class JsonToStructs( {"time":"26/08/2015"} > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] - > SELECT _FUNC_(map('a',named_struct('b',1))); + > SELECT _FUNC_(map('a', named_struct('b', 1))); {"a":{"b":1}} - > SELECT _FUNC_(map(named_struct('a',1),named_struct('b',2))); + > SELECT _FUNC_(map(named_struct('a', 1),named_struct('b', 2))); {"[1]":{"b":2}} - > SELECT _FUNC_(map('a',1)); + > SELECT _FUNC_(map('a', 1)); {"a":1} - > SELECT _FUNC_(array((map('a',1)))); + > SELECT _FUNC_(array((map('a', 1)))); [{"a":1}] """, since = "2.2.0") http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index dfe7e28..eb06e4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -43,7 +43,7 @@ private[sql] class JacksonGenerator( private type ValueWriter = (SpecializedGetters, Int) => Unit // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. - require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType], + require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType], "JacksonGenerator only supports to be initialized with a StructType " + s"or MapType but got ${dataType.simpleString}") http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index dcced79..d9dc728 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -26,13 +26,13 @@ Extended Usage: {"time":"26/08/2015"} > SELECT to_json(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] - > SELECT to_json(map('a',named_struct('b',1))); + > SELECT to_json(map('a', named_struct('b', 1))); {"a":{"b":1}} - > SELECT to_json(map(named_struct('a',1),named_struct('b',2))); + > SELECT to_json(map(named_struct('a', 1),named_struct('b', 2))); {"[1]":{"b":2}} - > SELECT to_json(map('a',1)); + > SELECT to_json(map('a', 1)); {"a":1} - > SELECT to_json(array((map('a',1)))); + > SELECT to_json(array((map('a', 1)))); [{"a":1}] Since: 2.2.0 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org