This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7c81bdf1ed17 [SPARK-47345][SQL][TESTS] Xml functions suite 7c81bdf1ed17 is described below commit 7c81bdf1ed17df31ec6d7a3ee9f18b73d8ae2bd6 Author: Yousof Hosny <yousof.ho...@databricks.com> AuthorDate: Fri Mar 15 22:56:29 2024 +0500 [SPARK-47345][SQL][TESTS] Xml functions suite ### What changes were proposed in this pull request? Convert JsonFunctiosnSuite.scala to XML equivalent. Note that XML doesn’t implement all json functions like json_tuple, get_json_object, etc. ### Why are the changes needed? Improve unit test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45466 from yhosny/xml-functions-suite. Authored-by: Yousof Hosny <yousof.ho...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../org/apache/spark/sql/XmlFunctionsSuite.scala | 480 +++++++++++++++++++++ 1 file changed, 480 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala new file mode 100644 index 000000000000..fcfbebaa61ec --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.text.SimpleDateFormat +import java.util.Locale + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class XmlFunctionsSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + test("from_xml") { + val df = Seq("""<ROW><a>1</a></ROW>""").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_xml($"value", schema)), + Row(Row(1)) :: Nil) + } + + test("from_xml with option (timestampFormat)") { + val df = Seq("""<ROW><time>26/08/2015 18:00</time></ROW>""").toDS() + val schema = new StructType().add("time", TimestampType) + val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava + + checkAnswer( + df.select(from_xml($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + } + + test("from_xml with option (rowTag)") { + val df = Seq("""<foo><a>1</a></foo>""").toDS() + val schema = new StructType().add("a", IntegerType) + val options = Map("rowTag" -> "foo").asJava + + checkAnswer( + df.select(from_xml($"value", schema)), + Row(Row(1)) :: Nil) + } + + test("from_xml with option (dateFormat)") { + val df = Seq("""<ROW><time>26/08/2015</time></ROW>""").toDS() + val schema = new StructType().add("time", DateType) + val options = Map("dateFormat" -> "dd/MM/yyyy").asJava + + checkAnswer( + df.select(from_xml($"value", schema, options)), + Row(Row(java.sql.Date.valueOf("2015-08-26")))) + } + + test("from_xml missing columns") { + val df = Seq("""<ROW><a>1</a></ROW>""").toDS() + val schema = new StructType().add("b", IntegerType) + + checkAnswer( + df.select(from_xml($"value", schema)), + Row(Row(null)) :: Nil) + } + + test("from_xml invalid xml") { + val df = Seq("""<ROW><a>1</ROW>""").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_xml($"value", schema)), + Row(Row(null)) :: Nil) + } + + test("from_xml - xml doesn't conform to the array type") { + val df = Seq("""<ROW><a>1</ROW>""").toDS() + val schema = StructType(StructField("a", ArrayType(IntegerType)) :: Nil) + + checkAnswer(df.select(from_xml($"value", schema)), Row(Row(null))) + } + + test("from_xml array support") { + val df = Seq(s"""<ROW> <a>1</a> <a>2</a> </ROW>""".stripMargin).toDS() + val schema = StructType(StructField("a", ArrayType(IntegerType)) :: Nil) + + checkAnswer( + df.select(from_xml($"value", schema)), + Row(Row(Array(1, 2)))) + } + + test("from_xml uses DDL strings for defining a schema - java") { + val df = Seq("""<ROW> <a>1</a> <b>haa</b> </ROW>""").toDS() + checkAnswer( + df.select(from_xml($"value", "a INT, b STRING", new java.util.HashMap[String, String]())), + Row(Row(1, "haa")) :: Nil) + } + + test("to_xml - struct") { + val schema = StructType(StructField("a", IntegerType, nullable = false) :: Nil) + val data = Seq(Row(1)) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .withColumn("a", struct($"a")) + + val expected = + s"""|<ROW> + | <a>1</a> + |</ROW>""".stripMargin + checkAnswer( + df.select(to_xml($"a")), + Row(expected) :: Nil) + } + + test("to_xml with option (timestampFormat)") { + val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") + val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil) + val data = Seq(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .withColumn("a", struct($"a")) + + val expected = + s"""|<ROW> + | <a>26/08/2015 18:00</a> + |</ROW>""".stripMargin + checkAnswer( + df.select(to_xml($"a", options.asJava)), + Row(expected) :: Nil) + } + + test("to_xml with option (dateFormat)") { + val options = Map("dateFormat" -> "dd/MM/yyyy") + val schema = StructType(StructField("a", DateType, nullable = false) :: Nil) + val data = Seq(Row(java.sql.Date.valueOf("2015-08-26"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .withColumn("a", struct($"a")) + + val expected = + s"""|<ROW> + | <a>26/08/2015</a> + |</ROW>""".stripMargin + checkAnswer( + df.select(to_xml($"a", options.asJava)), + Row(expected) :: Nil) + } + + test("roundtrip in to_xml and from_xml - struct") { + val schemaOne = StructType(StructField("a", IntegerType, nullable = false) :: Nil) + val dataOne = Seq(Row(1, 2, 3)) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(dataOne), schemaOne) + .withColumn("a", struct($"a")) + val readBackOne = df1.select(to_xml($"a").as("xml")) + .select(from_xml($"xml", schemaOne).as("a")) + checkAnswer(df1, readBackOne) + + val xml = + s"""|<ROW> + | <a>1</a> + |</ROW>""".stripMargin + val schemaTwo = new StructType().add("a", IntegerType) + val dfTwo = Seq(Some(xml), None).toDF("xml") + val readBackTwo = dfTwo.select(from_xml($"xml", schemaTwo).as("struct")) + .select(to_xml($"struct").as("xml")) + checkAnswer(dfTwo, readBackTwo) + } + + test("roundtrip in to_xml and from_xml - array") { + val schemaOne = StructType(StructField("a", ArrayType(IntegerType), nullable = false) :: Nil) + val dataOne = Seq(Row(Array(1, 2, 3))) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(dataOne), schemaOne) + .withColumn("a", struct($"a")) + val readBackOne = df1.select(to_xml($"a").as("xml")) + .select(from_xml($"xml", schemaOne).as("a")) + checkAnswer(df1, readBackOne) + + val xml = + s"""|<ROW> + | <a>1</a> + | <a>2</a> + |</ROW>""".stripMargin + val schemaTwo = new StructType().add("a", ArrayType(IntegerType)) + val dfTwo = Seq(Some(xml), None).toDF("xml") + val readBackTwo = dfTwo.select(from_xml($"xml", schemaTwo).as("struct")) + .select(to_xml($"struct").as("xml")) + checkAnswer(dfTwo, readBackTwo) + } + + test("Support to_xml in SQL") { + val schemaOne = StructType(StructField("a", IntegerType, nullable = false) :: Nil) + val dataOne = Seq(Row(1)) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(dataOne), schemaOne) + .withColumn("a", struct($"a")) + val xml = + s"""|<ROW> + | <a>1</a> + |</ROW>""".stripMargin + checkAnswer ( + df1.selectExpr("to_xml(a)"), + Row(xml) :: Nil) + + val xml2 = + s"""|<ROW> + | <a>26/08/2015 18:00</a> + |</ROW>""".stripMargin + val schema2 = StructType(StructField("a", TimestampType, nullable = false) :: Nil) + val dataTwo = Seq(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataTwo), schema2) + .withColumn("a", struct($"a")) + checkAnswer( + df2.selectExpr("to_xml(a, map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), + Row(xml2) :: Nil) + + checkError( + exception = intercept[AnalysisException] { + df2.selectExpr("to_xml(a, named_struct('a', 1))") + }, + errorClass = "INVALID_OPTIONS.NON_MAP_FUNCTION", + parameters = Map.empty, + context = ExpectedContext( + fragment = "to_xml(a, named_struct('a', 1))", + start = 0, + stop = 30 + ) + ) + + checkError( + exception = intercept[AnalysisException] { + df2.selectExpr("to_xml(a, map('a', 1))") + }, + errorClass = "INVALID_OPTIONS.NON_STRING_TYPE", + parameters = Map("mapType" -> "\"MAP<STRING, INT>\""), + context = ExpectedContext( + fragment = "to_xml(a, map('a', 1))", + start = 0, + stop = 21 + ) + ) + } + + test("Support from_xml in SQL") { + val xml = + s"""|<ROW> + | <a>1</a> + |</ROW>""".stripMargin + val df1 = Seq(xml).toDS() + checkAnswer( + df1.selectExpr("from_xml(value, 'a INT')"), + Row(Row(1)) :: Nil) + + val xml2 = + s"""|<ROW> + | <c0>a</c0> + | <c1>1</c1> + | <c2> + | <c20> + | 3.8 + | </c20> + | <c21> + | 8 + | </c21> + | </c2> + |</ROW>""".stripMargin + val df2 = Seq(xml2).toDS() + checkAnswer( + df2.selectExpr("from_xml(value, 'c0 STRING, c1 INT, c2 STRUCT<c20: DOUBLE, c21: INT>')"), + Row(Row("a", 1, Row(3.8, 8))) :: Nil) + + val xml3 = + s"""|<ROW> + | <time>26/08/2015 18:00</time> + |</ROW>""".stripMargin + val df3 = Seq(xml3).toDS() + checkAnswer( + df3.selectExpr( + "from_xml(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + + checkError( + exception = intercept[AnalysisException] { + df3.selectExpr("from_xml(value, 1)") + }, + errorClass = "INVALID_SCHEMA.NON_STRING_LITERAL", + parameters = Map("inputSchema" -> "\"1\""), + context = ExpectedContext( + fragment = "from_xml(value, 1)", + start = 0, + stop = 17 + ) + ) + checkError( + exception = intercept[AnalysisException] { + df3.selectExpr("""from_xml(value, 'time InvalidType')""") + }, + errorClass = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map( + "error" -> "'InvalidType'", + "hint" -> ": extra input 'InvalidType'" + ), + context = ExpectedContext( + fragment = "from_xml(value, 'time InvalidType')", + start = 0, + stop = 34 + ) + ) + checkError( + exception = intercept[AnalysisException] { + df3.selectExpr("from_xml(value, 'time Timestamp', named_struct('a', 1))") + }, + errorClass = "INVALID_OPTIONS.NON_MAP_FUNCTION", + parameters = Map.empty, + context = ExpectedContext( + fragment = "from_xml(value, 'time Timestamp', named_struct('a', 1))", + start = 0, + stop = 54 + ) + ) + checkError( + exception = intercept[AnalysisException] { + df3.selectExpr("from_xml(value, 'time Timestamp', map('a', 1))") + }, + errorClass = "INVALID_OPTIONS.NON_STRING_TYPE", + parameters = Map("mapType" -> "\"MAP<STRING, INT>\""), + context = ExpectedContext( + fragment = "from_xml(value, 'time Timestamp', map('a', 1))", + start = 0, + stop = 45 + ) + ) + } + + test("infers schemas of xml strings and pass them to from_xml") { + val xml = + s"""|<ROW> + | <a>1</a> + | <a>2</a> + | <a>3</a> + |</ROW>""".stripMargin + val in = Seq(xml).toDS() + val out = in.select(from_xml($"value", schema_of_xml(xml)) as "parsed") + val expected = StructType(StructField( + "parsed", + StructType(StructField( + "a", + ArrayType(LongType, true), true) :: Nil), + true) :: Nil) + + assert(out.schema == expected) + } + + test("infers schemas using options") { + val df = spark.range(1) + .select(schema_of_xml(lit("<ROW><a>1</a></ROW>"), + Map("allowUnquotedFieldNames" -> "true").asJava)) + checkAnswer(df, Seq(Row("STRUCT<a: BIGINT>"))) + } + + test("from_xml invalid xml - check modes") { + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("_unparsed", StringType) + val badRec = """<ROW><a>1<b><2></b></ROW>""" + val df = Seq(badRec, """<ROW><a>2</a><b>12</b></ROW>""").toDS() + + checkAnswer( + df.select(from_xml($"value", schema, Map("mode" -> "PERMISSIVE").asJava)), + Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil) + } + } + + test("corrupt record column in the middle") { + val schema = new StructType() + .add("a", IntegerType) + .add("_unparsed", StringType) + .add("b", IntegerType) + val badRec = """<ROW><a>1</a><b>2</ROW>""" + val df = Seq(badRec, """<ROW><a>1</a><b>12</b></ROW>""").toDS() + + checkAnswer( + df.select(from_xml($"value", schema, Map("columnNameOfCorruptRecord" -> "_unparsed").asJava)), + Row(Row(null, badRec, null)) :: Row(Row(1, null, null)) :: Nil) + } + + test("parse timestamps with locale") { + Seq("en-US", "ko-KR", "zh-CN", "ru-RU").foreach { langTag => + val locale = Locale.forLanguageTag(langTag) + val ts = new SimpleDateFormat("dd/MM/yyyy HH:mm").parse("06/11/2018 18:00") + val timestampFormat = "dd MMM yyyy HH:mm" + val sdf = new SimpleDateFormat(timestampFormat, locale) + val input = Seq(s"""<ROW><time>${sdf.format(ts)}</time></ROW>""").toDS() + val options = Map("timestampFormat" -> timestampFormat, "locale" -> langTag).asJava + val df = input.select(from_xml($"value", "time timestamp", options)) + + checkAnswer(df, Row(Row(java.sql.Timestamp.valueOf("2018-11-06 18:00:00.0")))) + } + } + + test("from_xml - timestamp in micros") { + val df = Seq("""<ROW><time>1970-01-01T00:00:00.123456</time></ROW>""").toDS() + val schema = new StructType().add("time", TimestampType) + val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS").asJava + + checkAnswer( + df.select(from_xml($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456")))) + } + + test("to_xml - timestamp in micros") { + val s = "2019-11-18 11:56:00.123456" + val xml = + s"""|<ROW> + | <t>$s</t> + |</ROW>""".stripMargin + val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select( + to_xml(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS").asJava)) + checkAnswer(df, Row(xml)) + } + + test("support foldable schema by from_xml") { + val options = Map[String, String]().asJava + val schema = regexp_replace(lit("dpt_org_id INT, dpt_org_city STRING"), "dpt_org_", "") + checkAnswer( + Seq("""<ROW><id>1</id><city>Moscow</city></ROW>""").toDS() + .select(from_xml($"value", schema, options)), + Row(Row(1, "Moscow"))) + + checkError( + exception = intercept[AnalysisException] { + Seq(("""<ROW><i>1</i></ROW>""", "i int")).toDF("xml", "schema") + .select(from_xml($"xml", $"schema", options)).collect() + }, + errorClass = "INVALID_SCHEMA.NON_STRING_LITERAL", + parameters = Map("inputSchema" -> "\"schema\""), + context = ExpectedContext(fragment = "from_xml", getCurrentClassCallSitePattern) + ) + } + + test("schema_of_xml - infers the schema of foldable JSON string") { + val input = regexp_replace( + lit("""<ROW><item_id>1</item_id><item_price>0.1</item_price></ROW>"""), "item_", "") + checkAnswer( + spark.range(1).select(schema_of_xml(input)), + Seq(Row("STRUCT<id: BIGINT, price: DOUBLE>"))) + } + + test("schema_of_xml - empty string as string") { + Seq("""<ROW><id></id></ROW>""").foreach { input => + checkAnswer( + spark.range(1).select(schema_of_xml(input)), + Seq(Row("STRUCT<id: STRING>"))) + } + } + + test("optional datetime parser does not affect xml time formatting") { + val s = "2015-08-26 12:34:46" + def toDF(p: String): DataFrame = sql( + s""" + |SELECT + | to_xml( + | named_struct('time', timestamp'$s'), map('timestampFormat', "$p") + | ) + | """.stripMargin) + checkAnswer(toDF("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), toDF("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]")) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org