This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 00d2b4fa2de [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL 00d2b4fa2de is described below commit 00d2b4fa2def948e7517bacfce7c75be6a37eb20 Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Mon Oct 30 14:00:31 2023 +0900 [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL ### What changes were proposed in this pull request? to_xml: Converts a `StructType` to a XML output string. Bindings for python, connect and SQL ### Why are the changes needed? to_xml: Converts a `StructType` to a XML output string. ### Does this PR introduce _any_ user-facing change? Yes, new to_xml API. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43503 from sandip-db/to_xml. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/sql/functions.scala | 31 +++ .../org/apache/spark/sql/FunctionTestSuite.scala | 6 + .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/connect/functions.py | 10 + python/pyspark/sql/functions.py | 36 +++ .../sql/tests/connect/test_connect_function.py | 14 + sql/catalyst/pom.xml | 4 + .../sql/catalyst/analysis/FunctionRegistry.scala | 3 +- .../sql/catalyst/expressions/xmlExpressions.scala | 90 ++++++- .../spark/sql/catalyst/xml/StaxXmlGenerator.scala | 295 ++++++++++++--------- .../apache/spark/sql/catalyst/xml/XmlOptions.scala | 5 + sql/core/pom.xml | 4 - .../datasources/xml/XmlOutputWriter.scala | 53 +--- .../scala/org/apache/spark/sql/functions.scala | 30 +++ .../sql-functions/sql-expression-schema.md | 1 + .../analyzer-results/xml-functions.sql.out | 122 +++++++++ .../resources/sql-tests/inputs/xml-functions.sql | 18 +- .../sql-tests/results/xml-functions.sql.out | 134 ++++++++++ .../sql/execution/datasources/xml/XmlSuite.scala | 25 ++ 19 files changed, 696 insertions(+), 186 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 9c5adca7e28..1c8f5993d29 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -7470,6 +7470,37 @@ object functions { fnWithOptions("schema_of_xml", options.asScala.iterator, xml) } + // scalastyle:off line.size.limit + + /** + * (Java-specific) Converts a column containing a `StructType` into a XML string with the + * specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e + * a column containing a struct. + * @param options + * options to control how the struct column is converted into a XML string. It accepts the + * same options as the XML data source. See <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data + * Source Option</a> in the version you use. + * @group xml_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def to_xml(e: Column, options: java.util.Map[String, String]): Column = + fnWithOptions("to_xml", options.asScala.iterator, e) + + /** + * Converts a column containing a `StructType` into a XML string with the specified schema. + * Throws an exception, in the case of an unsupported type. + * + * @param e + * a column containing a struct. + * @group xml_funcs + * @since 4.0.0 + */ + def to_xml(e: Column): Column = to_xml(e, Collections.emptyMap()) + /** * Returns the total number of elements in the array. The function returns null for null input. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala index e350bde9946..748843ec991 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala @@ -237,6 +237,12 @@ class FunctionTestSuite extends ConnectFunSuite { from_xml(a, schema, Map.empty[String, String].asJava), from_xml(a, schema, Collections.emptyMap[String, String]), from_xml(a, lit(schema.json), Collections.emptyMap[String, String])) + testEquals( + "schema_of_xml", + schema_of_xml(lit("<p><a>1.0</a><b>test</b></p>")), + schema_of_xml("<p><a>1.0</a><b>test</b></p>"), + schema_of_xml(lit("<p><a>1.0</a><b>test</b></p>"), Collections.emptyMap())) + testEquals("to_xml", to_xml(a), to_xml(a, Collections.emptyMap[String, String])) testEquals( "from_avro", diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 5e05dac7bc3..4dc10cc1556 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -547,6 +547,7 @@ XML Functions from_xml schema_of_xml + to_xml xpath xpath_boolean xpath_double diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index f065e5391fe..38eb814247c 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2200,6 +2200,16 @@ def to_json(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Co to_json.__doc__ = pysparkfuncs.to_json.__doc__ +def to_xml(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: + if options is None: + return _invoke_function("to_xml", _to_col(col)) + else: + return _invoke_function("to_xml", _to_col(col), _options_to_col(options)) + + +to_xml.__doc__ = pysparkfuncs.to_xml.__doc__ + + def transform( col: "ColumnOrName", f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]], diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 05c22685b09..869506a3558 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -13678,6 +13678,42 @@ def schema_of_xml(xml: "ColumnOrName", options: Optional[Dict[str, str]] = None) return _invoke_function("schema_of_xml", col, _options_to_str(options)) +@_try_remote_functions +def to_xml(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: + """ + Converts a column containing a :class:`StructType` into a XML string. + Throws an exception, in the case of an unsupported type. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column containing a struct. + options: dict, optional + options to control converting. accepts the same options as the XML datasource. + See `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_ + for the version you use. + + .. # noqa + + Returns + ------- + :class:`~pyspark.sql.Column` + a XML string converted from given :class:`StructType`. + + Examples + -------- + >>> from pyspark.sql import Row + >>> data = [(1, Row(age=2, name='Alice'))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_xml(df.value, {'rowTag':'person'}).alias("xml")).collect() + [Row(xml='<person>\\n <age>2</age>\\n <name>Alice</name>\\n</person>')] + """ + + return _invoke_function("to_xml", _to_java_column(col), _options_to_str(options)) + + @_try_remote_functions def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: """ diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index bc0cf162648..9adae0f6f75 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -1879,6 +1879,10 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S cdf.select(CF.from_xml("b", schema, {"mode": "FAILFAST"})), sdf.select(SF.from_xml("b", schema, {"mode": "FAILFAST"})), ) + self.compare_by_show( + sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), {"rowTag": "person"})), + sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), {"rowTag": "person"})), + ) c_schema = CF.schema_of_xml(CF.lit("""<p><a>1</a></p>""")) s_schema = SF.schema_of_xml(SF.lit("""<p><a>1</a></p>""")) @@ -1923,6 +1927,16 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S ).toPandas(), ) + # test to_xml + self.compare_by_show( + cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")))), + sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")))), + ) + self.compare_by_show( + cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")), {"mode": "FAILFAST"})), + sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})), + ) + def test_string_functions_one_arg(self): query = """ SELECT * FROM VALUES diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 8f2b9ccffeb..e7f8cbe0fe6 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -113,6 +113,10 @@ <groupId>org.apache.ws.xmlschema</groupId> <artifactId>xmlschema-core</artifactId> </dependency> + <dependency> + <groupId>org.glassfish.jaxb</groupId> + <artifactId>txw2</artifactId> + </dependency> <dependency> <groupId>org.apache.datasketches</groupId> <artifactId>datasketches-java</artifactId> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8be3199ef9b..1449764cdd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -834,7 +834,8 @@ object FunctionRegistry { // Xml expression[XmlToStructs]("from_xml"), - expression[SchemaOfXml]("schema_of_xml") + expression[SchemaOfXml]("schema_of_xml"), + expression[StructsToXml]("to_xml") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index df63429ae33..047b669fc89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -16,13 +16,15 @@ */ package org.apache.spark.sql.catalyst.expressions +import java.io.CharArrayWriter + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode} -import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions} +import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -227,3 +229,89 @@ case class SchemaOfXml( override protected def withNewChildInternal(newChild: Expression): SchemaOfXml = copy(child = newChild) } + +/** + * Converts a [[StructType]] to a XML output string. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr[, options]) - Returns a XML string with a given struct value", + examples = """ + Examples: + > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); + <ROW> + <a>1</a> + <b>2</b> + </ROW> + > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); + <ROW> + <time>26/08/2015</time> + </ROW> + """, + since = "4.0.0", + group = "xml_funcs") +// scalastyle:on line.size.limit +case class StructsToXml( + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression + with TimeZoneAwareExpression + with CodegenFallback + with ExpectsInputTypes + with NullIntolerant { + override def nullable: Boolean = true + + def this(options: Map[String, String], child: Expression) = this(options, child, None) + + // Used in `FunctionRegistry` + def this(child: Expression) = this(Map.empty, child, None) + + def this(child: Expression, options: Expression) = + this( + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + @transient + lazy val writer = new CharArrayWriter() + + @transient + lazy val inputSchema: StructType = child.dataType match { + case st: StructType => st + case other => + throw new IllegalArgumentException(s"Unsupported input type ${other.catalogString}") + } + + @transient + lazy val gen = new StaxXmlGenerator( + inputSchema, writer, new XmlOptions(options, timeZoneId.get), false) + + // This converts rows to the XML output according to the given schema. + @transient + lazy val converter: Any => UTF8String = { + def getAndReset(): UTF8String = { + gen.flush() + val xmlString = writer.toString + writer.reset() + UTF8String.fromString(xmlString) + } + (row: Any) => + gen.write(row.asInstanceOf[InternalRow]) + getAndReset() + } + + override def dataType: DataType = StringType + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def nullSafeEval(value: Any): Any = converter(value) + + override def inputTypes: Seq[AbstractDataType] = StructType :: Nil + + override def prettyName: String = "to_xml" + + override protected def withNewChildInternal(newChild: Expression): StructsToXml = + copy(child = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala index f1cbc8996b0..4477cf50823 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter +import org.apache.hadoop.shaded.com.ctc.wstx.api.WstxOutputProperties + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -// This class is borrowed from Spark json datasource. -private[sql] object StaxXmlGenerator { +class StaxXmlGenerator( + schema: StructType, + writer: Writer, + options: XmlOptions, + validateStructure: Boolean = true) { + + require(options.attributePrefix.nonEmpty, + "'attributePrefix' option should not be empty string.") + private val indentDisabled = options.indent == "" + + private val gen = { + val factory = XMLOutputFactory.newInstance() + // to_xml disables structure validation to allow multiple root tags + factory.setProperty(WstxOutputProperties.P_OUTPUT_VALIDATE_STRUCTURE, validateStructure) + val xmlWriter = factory.createXMLStreamWriter(writer) + if (!indentDisabled) { + val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter) + indentingXmlWriter.setIndentStep(options.indent) + indentingXmlWriter + } else { + xmlWriter + } + } + + private var rootElementWritten: Boolean = false + def writeDeclaration(): Unit = { + // Allow a root tag to be like "rootTag foo='bar'" + // This is hacky; won't deal correctly with spaces in attributes, but want + // to make this at least work for simple cases without much complication + val rootTagTokens = options.rootTag.split(" ") + val rootElementName = rootTagTokens.head + val rootAttributes: Map[String, String] = + if (rootTagTokens.length > 1) { + rootTagTokens.tail.map { kv => + val Array(k, v) = kv.split("=") + k -> v.replaceAll("['\"]", "") + }.toMap + } else { + Map.empty + } + val declaration = options.declaration + if (declaration != null && declaration.nonEmpty) { + gen.writeProcessingInstruction("xml", declaration) + gen.writeCharacters("\n") + } + gen.writeStartElement(rootElementName) + rootAttributes.foreach { case (k, v) => + gen.writeAttribute(k, v) + } + if (indentDisabled) { + gen.writeCharacters("\n") + } + rootElementWritten = true + } + + def flush(): Unit = gen.flush() + + def close(): Unit = { + if (rootElementWritten) { + gen.writeEndElement() + gen.close() + } + writer.close() + } /** * Transforms a single Row to XML * - * @param schema - * the schema object used for conversion - * @param writer - * a XML writer object - * @param options - * options for XML datasource. * @param row - * The row to convert + * The row to convert */ - def apply(schema: StructType, writer: XMLStreamWriter, options: XmlOptions)( - row: InternalRow): Unit = { - - require( - options.attributePrefix.nonEmpty, - "'attributePrefix' option should not be empty string.") + def write(row: InternalRow): Unit = { + writeChildElement(options.rowTag, schema, row) + if (indentDisabled) { + gen.writeCharacters("\n") + } + } - def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name, dt, v) match { + def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name, dt, v) match { + // If this is meant to be value but in no child, write only a value + case (_, _, null) | (_, NullType, _) if options.nullValue == null => + // Because usually elements having `null` do not exist, just do not write + // elements when given values are `null`. + case (_, _, _) if name == options.valueTag => // If this is meant to be value but in no child, write only a value - case (_, _, null) | (_, NullType, _) if options.nullValue == null => - // Because usually elements having `null` do not exist, just do not write - // elements when given values are `null`. - case (_, _, _) if name == options.valueTag => - // If this is meant to be value but in no child, write only a value - writeElement(dt, v, options) - case (_, _, _) => - writer.writeStartElement(name) - writeElement(dt, v, options) - writer.writeEndElement() - } + writeElement(dt, v, options) + case (_, _, _) => + gen.writeStartElement(name) + writeElement(dt, v, options) + gen.writeEndElement() + } - def writeChild(name: String, dt: DataType, v: Any): Unit = { - (dt, v) match { - // If this is meant to be attribute, write an attribute - case (_, null) | (NullType, _) - if name.startsWith(options.attributePrefix) && name != options.valueTag => - Option(options.nullValue).foreach { - writer.writeAttribute(name.substring(options.attributePrefix.length), _) - } - case _ if name.startsWith(options.attributePrefix) && name != options.valueTag => - writer.writeAttribute(name.substring(options.attributePrefix.length), v.toString) - - // For ArrayType, we just need to write each as XML element. - case (ArrayType(ty, _), v: ArrayData) => - (0 until v.numElements()).foreach { i => - writeChildElement(name, ty, v.get(i, ty)) - } - // For other datatypes, we just write normal elements. - case _ => - writeChildElement(name, dt, v) - } - } + def writeChild(name: String, dt: DataType, v: Any): Unit = { + (dt, v) match { + // If this is meant to be attribute, write an attribute + case (_, null) | (NullType, _) + if name.startsWith(options.attributePrefix) && name != options.valueTag => + Option(options.nullValue).foreach { + gen.writeAttribute(name.substring(options.attributePrefix.length), _) + } + case _ if name.startsWith(options.attributePrefix) && name != options.valueTag => + gen.writeAttribute(name.substring(options.attributePrefix.length), v.toString) - def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match { - case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue) - case (StringType, v: UTF8String) => writer.writeCharacters(v.toString) - case (StringType, v: String) => writer.writeCharacters(v) - case (TimestampType, v: Timestamp) => - writer.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant())) - case (TimestampType, v: Long) => - writer.writeCharacters(options.timestampFormatterInWrite.format(v)) - case (DateType, v: Int) => - writer.writeCharacters(options.dateFormatterInWrite.format(v)) - case (IntegerType, v: Int) => writer.writeCharacters(v.toString) - case (ShortType, v: Short) => writer.writeCharacters(v.toString) - case (FloatType, v: Float) => writer.writeCharacters(v.toString) - case (DoubleType, v: Double) => writer.writeCharacters(v.toString) - case (LongType, v: Long) => writer.writeCharacters(v.toString) - case (DecimalType(), v: java.math.BigDecimal) => writer.writeCharacters(v.toString) - case (DecimalType(), v: Decimal) => writer.writeCharacters(v.toString) - case (ByteType, v: Byte) => writer.writeCharacters(v.toString) - case (BooleanType, v: Boolean) => writer.writeCharacters(v.toString) - - // For the case roundtrip in reading and writing XML files, [[ArrayType]] cannot have - // [[ArrayType]] as element type. It always wraps the element with [[StructType]]. So, - // this case only can happen when we convert a normal [[DataFrame]] to XML file. - // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing what is element name - // for XML file. + // For ArrayType, we just need to write each as XML element. case (ArrayType(ty, _), v: ArrayData) => (0 until v.numElements()).foreach { i => - writeChild(options.arrayElementName, ty, v.get(i, ty)) + writeChildElement(name, ty, v.get(i, ty)) } + // For other datatypes, we just write normal elements. + case _ => + writeChildElement(name, dt, v) + } + } - case (MapType(_, vt, _), mv: Map[_, _]) => - val (attributes, elements) = mv.toSeq.partition { case (f, _) => - f.toString.startsWith(options.attributePrefix) && f.toString != options.valueTag - } - // We need to write attributes first before the value. - (attributes ++ elements).foreach { case (k, v) => - writeChild(k.toString, vt, v) - } + def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match { + case (_, null) | (NullType, _) => gen.writeCharacters(options.nullValue) + case (StringType, v: UTF8String) => gen.writeCharacters(v.toString) + case (StringType, v: String) => gen.writeCharacters(v) + case (TimestampType, v: Timestamp) => + gen.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant())) + case (TimestampType, v: Long) => + gen.writeCharacters(options.timestampFormatterInWrite.format(v)) + case (DateType, v: Int) => + gen.writeCharacters(options.dateFormatterInWrite.format(v)) + case (IntegerType, v: Int) => gen.writeCharacters(v.toString) + case (ShortType, v: Short) => gen.writeCharacters(v.toString) + case (FloatType, v: Float) => gen.writeCharacters(v.toString) + case (DoubleType, v: Double) => gen.writeCharacters(v.toString) + case (LongType, v: Long) => gen.writeCharacters(v.toString) + case (DecimalType(), v: java.math.BigDecimal) => gen.writeCharacters(v.toString) + case (DecimalType(), v: Decimal) => gen.writeCharacters(v.toString) + case (ByteType, v: Byte) => gen.writeCharacters(v.toString) + case (BooleanType, v: Boolean) => gen.writeCharacters(v.toString) + + // For the case roundtrip in reading and writing XML files, [[ArrayType]] cannot have + // [[ArrayType]] as element type. It always wraps the element with [[StructType]]. So, + // this case only can happen when we convert a normal [[DataFrame]] to XML file. + // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing what is element name + // for XML file. + case (ArrayType(ty, _), v: ArrayData) => + (0 until v.numElements()).foreach { i => + writeChild(options.arrayElementName, ty, v.get(i, ty)) + } - case (mt: MapType, mv: MapData) => writeMapData(mt, mv) + case (MapType(_, vt, _), mv: Map[_, _]) => + val (attributes, elements) = mv.toSeq.partition { case (f, _) => + f.toString.startsWith(options.attributePrefix) && f.toString != options.valueTag + } + // We need to write attributes first before the value. + (attributes ++ elements).foreach { case (k, v) => + writeChild(k.toString, vt, v) + } - case (st: StructType, r: InternalRow) => - val (attributes, elements) = st.zip(r.toSeq(st)).partition { case (f, _) => - f.name.startsWith(options.attributePrefix) && f.name != options.valueTag - } - // We need to write attributes first before the value. - (attributes ++ elements).foreach { case (field, value) => - writeChild(field.name, field.dataType, value) - } + case (mt: MapType, mv: MapData) => writeMapData(mt, mv) - case (_, _) => - throw new IllegalArgumentException( - s"Failed to convert value $v (class of ${v.getClass}) in type $dt to XML.") - } - - def writeMapData(mapType: MapType, map: MapData): Unit = { - val keyArray = map.keyArray() - val valueArray = map.valueArray() - // write attributes first - Seq(true, false).foreach { writeAttribute => - (0 until map.numElements()).foreach { i => - val key = keyArray.get(i, mapType.keyType).toString - val isAttribute = key.startsWith(options.attributePrefix) && key != options.valueTag - if (writeAttribute == isAttribute) { - writeChild(key, mapType.valueType, valueArray.get(i, mapType.valueType)) - } - } + case (st: StructType, r: InternalRow) => + val (attributes, elements) = st.zip(r.toSeq(st)).partition { case (f, _) => + f.name.startsWith(options.attributePrefix) && f.name != options.valueTag + } + // We need to write attributes first before the value. + (attributes ++ elements).foreach { case (field, value) => + writeChild(field.name, field.dataType, value) } - } - val rowSeq = row.toSeq(schema) - val (attributes, elements) = schema.zip(rowSeq).partition { case (f, _) => - f.name.startsWith(options.attributePrefix) && f.name != options.valueTag - } - // Writing attributes - writer.writeStartElement(options.rowTag) - attributes.foreach { - case (f, v) if v == null || f.dataType == NullType => - Option(options.nullValue).foreach { - writer.writeAttribute(f.name.substring(options.attributePrefix.length), _) + case (_, _) => + throw new IllegalArgumentException( + s"Failed to convert value $v (class of ${v.getClass}) in type $dt to XML.") + } + + def writeMapData(mapType: MapType, map: MapData): Unit = { + val keyArray = map.keyArray() + val valueArray = map.valueArray() + // write attributes first + Seq(true, false).foreach { writeAttribute => + (0 until map.numElements()).foreach { i => + val key = keyArray.get(i, mapType.keyType).toString + val isAttribute = key.startsWith(options.attributePrefix) && key != options.valueTag + if (writeAttribute == isAttribute) { + writeChild(key, mapType.valueType, valueArray.get(i, mapType.valueType)) } - case (f, v) => - writer.writeAttribute(f.name.substring(options.attributePrefix.length), v.toString) + } } - // Writing elements - val (names, values) = elements.unzip - val elementSchema = StructType(schema.filter(names.contains)) - - val elementRow = InternalRow.fromSeq(rowSeq.filter(values.contains)) - writeElement(elementSchema, elementRow, options) - writer.writeEndElement() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala index 763aa877ca0..7d049fdd82b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala @@ -100,6 +100,9 @@ private[sql] class XmlOptions( val wildcardColName = parameters.getOrElse(WILDCARD_COL_NAME, XmlOptions.DEFAULT_WILDCARD_COL_NAME) val ignoreNamespace = getBool(IGNORE_NAMESPACE, false) + // setting indent to "" disables indentation in the generated XML. + // Each row will be written in a new line. + val indent = parameters.getOrElse(INDENT, DEFAULT_INDENT) /** * Infer columns with all valid date entries as date type (otherwise inferred as string or @@ -198,6 +201,7 @@ private[sql] object XmlOptions extends DataSourceOptions { val DEFAULT_CHARSET: String = StandardCharsets.UTF_8.name val DEFAULT_NULL_VALUE: String = null val DEFAULT_WILDCARD_COL_NAME = "xs_any" + val DEFAULT_INDENT = " " val ROW_TAG = newOption("rowTag") val ROOT_TAG = newOption("rootTag") val DECLARATION = newOption("declaration") @@ -222,6 +226,7 @@ private[sql] object XmlOptions extends DataSourceOptions { val DATE_FORMAT = newOption("dateFormat") val TIMESTAMP_FORMAT = newOption("timestampFormat") val TIME_ZONE = newOption("timeZone") + val INDENT = newOption("indent") // Options with alternative val ENCODING = "encoding" val CHARSET = "charset" diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b2b8398c9d5..8fabfd4699d 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -145,10 +145,6 @@ <groupId>org.apache.ws.xmlschema</groupId> <artifactId>xmlschema-core</artifactId> </dependency> - <dependency> - <groupId>org.glassfish.jaxb</groupId> - <artifactId>txw2</artifactId> - </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-asm9-shaded</artifactId> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala index cde866dcedf..ac3dfb287ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala @@ -17,15 +17,13 @@ package org.apache.spark.sql.execution.datasources.xml import java.nio.charset.Charset -import javax.xml.stream.XMLOutputFactory -import com.sun.xml.txw2.output.IndentingXMLStreamWriter import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.{xml, InternalRow} -import org.apache.spark.sql.catalyst.xml.XmlOptions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, XmlOptions} import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter} import org.apache.spark.sql.types.StructType @@ -35,59 +33,20 @@ class XmlOutputWriter( context: TaskAttemptContext, options: XmlOptions) extends OutputWriter with Logging { - private val DEFAULT_INDENT = " " private val charset = Charset.forName(options.charset) private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - private val factory = XMLOutputFactory.newInstance() - private val xmlWriter = factory.createXMLStreamWriter(writer) - private val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter) - indentingXmlWriter.setIndentStep(DEFAULT_INDENT) - // Allow a root tag to be like "rootTag foo='bar'" - // This is hacky; won't deal correctly with spaces in attributes, but want - // to make this at least work for simple cases without much complication - private val rootTagTokens = options.rootTag.split(" ") - private val rootElementName = rootTagTokens.head - private val rootAttributes: Map[String, String] = - if (rootTagTokens.length > 1) { - rootTagTokens.tail.map { kv => - val Array(k, v) = kv.split("=") - k -> v.replaceAll("['\"]", "") - }.toMap - } else { - Map.empty - } - private val declaration = options.declaration - - - // private val gen = new UnivocityGenerator(dataSchema, writer, params) + private val gen = new StaxXmlGenerator(dataSchema, writer, options) private var firstRow: Boolean = true - override def write(row: InternalRow): Unit = { if (firstRow) { - if (declaration != null && declaration.nonEmpty) { - indentingXmlWriter.writeProcessingInstruction("xml", declaration) - indentingXmlWriter.writeCharacters("\n") - } - indentingXmlWriter.writeStartElement(rootElementName) - rootAttributes.foreach { case (k, v) => - indentingXmlWriter.writeAttribute(k, v) - } + gen.writeDeclaration() firstRow = false } - xml.StaxXmlGenerator( - dataSchema, - indentingXmlWriter, - options)(row) + gen.write(row) } - override def close(): Unit = { - if (!firstRow) { - indentingXmlWriter.writeEndElement() - indentingXmlWriter.close() - } - writer.close() - } + override def close(): Unit = gen.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 34e18cdf27a..b5e40fe35cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -7218,6 +7218,36 @@ object functions { withExpr(SchemaOfXml(xml.expr, options.asScala.toMap)) } + // scalastyle:off line.size.limit + + /** + * (Java-specific) Converts a column containing a `StructType` into a XML string with + * the specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e a column containing a struct. + * @param options options to control how the struct column is converted into a XML string. + * It accepts the same options as the XML data source. + * See + * <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> + * Data Source Option</a> in the version you use. + * @group xml_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def to_xml(e: Column, options: java.util.Map[String, String]): Column = + fnWithOptions("to_xml", options.asScala.iterator, e) + + /** + * Converts a column containing a `StructType` into a XML string with the specified schema. + * Throws an exception, in the case of an unsupported type. + * + * @param e a column containing a struct. + * @group xml_funcs + * @since 4.0.0 + */ + def to_xml(e: Column): Column = to_xml(e, Map.empty[String, String].asJava) + /** * A transform for timestamps and dates to partition data into years. * diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 42907b52cda..017cc474ea0 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -318,6 +318,7 @@ | org.apache.spark.sql.catalyst.expressions.StringTrimRight | rtrim | SELECT rtrim(' SparkSQL ') | struct<rtrim( SparkSQL ):string> | | org.apache.spark.sql.catalyst.expressions.StructsToCsv | to_csv | SELECT to_csv(named_struct('a', 1, 'b', 2)) | struct<to_csv(named_struct(a, 1, b, 2)):string> | | org.apache.spark.sql.catalyst.expressions.StructsToJson | to_json | SELECT to_json(named_struct('a', 1, 'b', 2)) | struct<to_json(named_struct(a, 1, b, 2)):string> | +| org.apache.spark.sql.catalyst.expressions.StructsToXml | to_xml | SELECT to_xml(named_struct('a', 1, 'b', 2)) | struct<to_xml(named_struct(a, 1, b, 2)):string> | | org.apache.spark.sql.catalyst.expressions.Substring | substr | SELECT substr('Spark SQL', 5) | struct<substr(Spark SQL, 5, 2147483647):string> | | org.apache.spark.sql.catalyst.expressions.Substring | substring | SELECT substring('Spark SQL', 5) | struct<substring(Spark SQL, 5, 2147483647):string> | | org.apache.spark.sql.catalyst.expressions.SubstringIndex | substring_index | SELECT substring_index('www.apache.org', '.', 2) | struct<substring_index(www.apache.org, ., 2):string> | diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out index e62f4aab344..51cf3d976f6 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out @@ -1,4 +1,126 @@ -- Automatically generated by SQLQueryTestSuite +-- !query +select to_xml(named_struct('a', 1, 'b', 2), map('indent', '')) +-- !query analysis +Project [to_xml((indent,), named_struct(a, 1, b, 2), Some(America/Los_Angeles)) AS to_xml(named_struct(a, 1, b, 2))#x] ++- OneRowRelation + + +-- !query +select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy', 'indent', '')) +-- !query analysis +Project [to_xml((timestampFormat,dd/MM/yyyy), (indent,), named_struct(time, to_timestamp(2015-08-26, Some(yyyy-MM-dd), TimestampType, Some(America/Los_Angeles), false)), Some(America/Los_Angeles)) AS to_xml(named_struct(time, to_timestamp(2015-08-26, yyyy-MM-dd)))#x] ++- OneRowRelation + + +-- !query +select to_xml(array(named_struct('a', 1, 'b', 2))) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"array(named_struct(a, 1, b, 2))\"", + "inputType" : "\"ARRAY<STRUCT<a: INT, b: INT>>\"", + "paramIndex" : "1", + "requiredType" : "\"STRUCT\"", + "sqlExpr" : "\"to_xml(array(named_struct(a, 1, b, 2)))\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 50, + "fragment" : "to_xml(array(named_struct('a', 1, 'b', 2)))" + } ] +} + + +-- !query +select to_xml(map('a', 1)) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"map(a, 1)\"", + "inputType" : "\"MAP<STRING, INT>\"", + "paramIndex" : "1", + "requiredType" : "\"STRUCT\"", + "sqlExpr" : "\"to_xml(map(a, 1))\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "to_xml(map('a', 1))" + } ] +} + + +-- !query +select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION", + "sqlState" : "42K06", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 79, + "fragment" : "to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))" + } ] +} + + +-- !query +select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1)) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE", + "sqlState" : "42K06", + "messageParameters" : { + "mapType" : "\"MAP<STRING, INT>\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 59, + "fragment" : "to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))" + } ] +} + + +-- !query +select to_xml() +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "0", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[1, 2]", + "functionName" : "`to_xml`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 15, + "fragment" : "to_xml()" + } ] +} + + -- !query select from_xml('<p><a>1</a></p>', 'a INT') -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql index cdf56712b11..7e3d21ef753 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql @@ -1,4 +1,14 @@ --- from_json +-- to_xml +select to_xml(named_struct('a', 1, 'b', 2), map('indent', '')); +select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy', 'indent', '')); +-- Check if errors handled +select to_xml(array(named_struct('a', 1, 'b', 2))); +select to_xml(map('a', 1)); +select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); +select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1)); +select to_xml(); + +-- from_xml select from_xml('<p><a>1</a></p>', 'a INT'); select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); -- Check if errors handled @@ -11,15 +21,15 @@ select from_xml(); -- Clean up DROP VIEW IF EXISTS xmlTable; --- from_json - complex types +-- from_xml - complex types select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>'); select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>'); --- infer schema of json literal +-- infer schema of xml literal select schema_of_xml('<p><a>1</a><b>"2"</b></p>'); select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', schema_of_xml('<p><a>1</a><a>2</a></p>')); --- from_json - array type +-- from_xml - array type select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>'); select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>'); select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>'); diff --git a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out index 61e8e9c8662..704addb7a93 100644 --- a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out @@ -1,4 +1,138 @@ -- Automatically generated by SQLQueryTestSuite +-- !query +select to_xml(named_struct('a', 1, 'b', 2), map('indent', '')) +-- !query schema +struct<to_xml(named_struct(a, 1, b, 2)):string> +-- !query output +<ROW><a>1</a><b>2</b></ROW> + + +-- !query +select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy', 'indent', '')) +-- !query schema +struct<to_xml(named_struct(time, to_timestamp(2015-08-26, yyyy-MM-dd))):string> +-- !query output +<ROW><time>26/08/2015</time></ROW> + + +-- !query +select to_xml(array(named_struct('a', 1, 'b', 2))) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"array(named_struct(a, 1, b, 2))\"", + "inputType" : "\"ARRAY<STRUCT<a: INT, b: INT>>\"", + "paramIndex" : "1", + "requiredType" : "\"STRUCT\"", + "sqlExpr" : "\"to_xml(array(named_struct(a, 1, b, 2)))\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 50, + "fragment" : "to_xml(array(named_struct('a', 1, 'b', 2)))" + } ] +} + + +-- !query +select to_xml(map('a', 1)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"map(a, 1)\"", + "inputType" : "\"MAP<STRING, INT>\"", + "paramIndex" : "1", + "requiredType" : "\"STRUCT\"", + "sqlExpr" : "\"to_xml(map(a, 1))\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "to_xml(map('a', 1))" + } ] +} + + +-- !query +select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION", + "sqlState" : "42K06", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 79, + "fragment" : "to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))" + } ] +} + + +-- !query +select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE", + "sqlState" : "42K06", + "messageParameters" : { + "mapType" : "\"MAP<STRING, INT>\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 59, + "fragment" : "to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))" + } ] +} + + +-- !query +select to_xml() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "0", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[1, 2]", + "functionName" : "`to_xml`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 15, + "fragment" : "to_xml()" + } ] +} + + -- !query select from_xml('<p><a>1</a></p>', 'a INT') -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index 7e5817bc3a0..20600848019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -1287,6 +1287,31 @@ class XmlSuite extends QueryTest with SharedSparkSession { assert(result.select("decoded").head().get(0) === Row(null, null)) } + test("from_xml to to_xml round trip") { + val xmlData = Seq( + "<person><age>100</age><name>Alice</name></person>", + "<person><age>100</age><name>Alice</name></person>", + "<person><age>100</age><name>Alice</name></person>") + val df = xmlData.toDF("xmlString") + val xmlSchema = schema_of_xml(xmlData.head) + + val df2 = df.withColumn("parsed", + from_xml(df.col("xmlString"), xmlSchema)) + val df3 = df2.select(to_xml($"parsed", Map("rowTag" -> "person").asJava)) + val xmlResult = df3.collect().map(_.getString(0).replaceAll("\\s+", "")) + assert(xmlData.sortBy(_.toString) === xmlResult.sortBy(_.toString)) + } + + test("to_xml to from_xml round trip") { + val df = spark.read.option("rowTag", "ROW").xml(getTestResourcePath(resDir + "cars.xml")) + val df1 = df.select(to_xml(struct("*")).as("xmlString")) + val schema = schema_of_xml(df1.select("xmlString").head().getString(0)) + val df2 = df1.select(from_xml($"xmlString", schema).as("fromXML")) + val df3 = df2.select(col("fromXML.*")) + assert(df3.collect().length === 3) + checkAnswer(df3, df) + } + test("decimals with scale greater than precision") { val spark = this.spark; import spark.implicits._ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org