HyukjinKwon commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1436659819
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##########
@@ -16,164 +16,201 @@
*/
package org.apache.spark.sql.catalyst.xml
+import java.io.Writer
import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
import scala.collection.Map
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter
+import org.apache.hadoop.shaded.com.ctc.wstx.api.WstxOutputProperties
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-// This class is borrowed from Spark json datasource.
-private[sql] object StaxXmlGenerator {
+class StaxXmlGenerator(
+ schema: StructType,
+ writer: Writer,
+ options: XmlOptions,
+ validateStructure: Boolean = true) {
+
+ require(options.attributePrefix.nonEmpty,
+ "'attributePrefix' option should not be empty string.")
+ private val indentDisabled = options.indent == ""
+
+ private val gen = {
+ val factory = XMLOutputFactory.newInstance()
+ // to_xml disables structure validation to allow multiple root tags
+ factory.setProperty(WstxOutputProperties.P_OUTPUT_VALIDATE_STRUCTURE,
validateStructure)
+ val xmlWriter = factory.createXMLStreamWriter(writer)
+ if (!indentDisabled) {
+ val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter)
+ indentingXmlWriter.setIndentStep(options.indent)
+ indentingXmlWriter
+ } else {
+ xmlWriter
+ }
+ }
+
+ private var rootElementWritten: Boolean = false
+ def writeDeclaration(): Unit = {
+ // Allow a root tag to be like "rootTag foo='bar'"
+ // This is hacky; won't deal correctly with spaces in attributes, but want
+ // to make this at least work for simple cases without much complication
+ val rootTagTokens = options.rootTag.split(" ")
+ val rootElementName = rootTagTokens.head
+ val rootAttributes: Map[String, String] =
+ if (rootTagTokens.length > 1) {
+ rootTagTokens.tail.map { kv =>
+ val Array(k, v) = kv.split("=")
+ k -> v.replaceAll("['\"]", "")
+ }.toMap
+ } else {
+ Map.empty
+ }
+ val declaration = options.declaration
+ if (declaration != null && declaration.nonEmpty) {
+ gen.writeProcessingInstruction("xml", declaration)
+ gen.writeCharacters("\n")
+ }
+ gen.writeStartElement(rootElementName)
+ rootAttributes.foreach { case (k, v) =>
+ gen.writeAttribute(k, v)
+ }
+ if (indentDisabled) {
+ gen.writeCharacters("\n")
+ }
+ rootElementWritten = true
+ }
+
+ def flush(): Unit = gen.flush()
+
+ def close(): Unit = {
+ if (rootElementWritten) {
+ gen.writeEndElement()
+ gen.close()
+ }
+ writer.close()
+ }
/**
* Transforms a single Row to XML
*
- * @param schema
- * the schema object used for conversion
- * @param writer
- * a XML writer object
- * @param options
- * options for XML datasource.
* @param row
- * The row to convert
+ * The row to convert
*/
- def apply(schema: StructType, writer: XMLStreamWriter, options: XmlOptions)(
- row: InternalRow): Unit = {
-
- require(
- options.attributePrefix.nonEmpty,
- "'attributePrefix' option should not be empty string.")
+ def write(row: InternalRow): Unit = {
+ writeChildElement(options.rowTag, schema, row)
+ if (indentDisabled) {
+ gen.writeCharacters("\n")
+ }
+ }
- def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name,
dt, v) match {
+ def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name, dt,
v) match {
+ // If this is meant to be value but in no child, write only a value
+ case (_, _, null) | (_, NullType, _) if options.nullValue == null =>
+ // Because usually elements having `null` do not exist, just do not write
+ // elements when given values are `null`.
+ case (_, _, _) if name == options.valueTag =>
// If this is meant to be value but in no child, write only a value
- case (_, _, null) | (_, NullType, _) if options.nullValue == null =>
- // Because usually elements having `null` do not exist, just do not write
- // elements when given values are `null`.
- case (_, _, _) if name == options.valueTag =>
- // If this is meant to be value but in no child, write only a value
- writeElement(dt, v, options)
- case (_, _, _) =>
- writer.writeStartElement(name)
- writeElement(dt, v, options)
- writer.writeEndElement()
- }
+ writeElement(dt, v, options)
+ case (_, _, _) =>
+ gen.writeStartElement(name)
+ writeElement(dt, v, options)
+ gen.writeEndElement()
+ }
- def writeChild(name: String, dt: DataType, v: Any): Unit = {
- (dt, v) match {
- // If this is meant to be attribute, write an attribute
- case (_, null) | (NullType, _)
- if name.startsWith(options.attributePrefix) && name !=
options.valueTag =>
- Option(options.nullValue).foreach {
-
writer.writeAttribute(name.substring(options.attributePrefix.length), _)
- }
- case _ if name.startsWith(options.attributePrefix) && name !=
options.valueTag =>
-
writer.writeAttribute(name.substring(options.attributePrefix.length),
v.toString)
-
- // For ArrayType, we just need to write each as XML element.
- case (ArrayType(ty, _), v: ArrayData) =>
- (0 until v.numElements()).foreach { i =>
- writeChildElement(name, ty, v.get(i, ty))
- }
- // For other datatypes, we just write normal elements.
- case _ =>
- writeChildElement(name, dt, v)
- }
- }
+ def writeChild(name: String, dt: DataType, v: Any): Unit = {
+ (dt, v) match {
+ // If this is meant to be attribute, write an attribute
+ case (_, null) | (NullType, _)
+ if name.startsWith(options.attributePrefix) && name !=
options.valueTag =>
+ Option(options.nullValue).foreach {
+ gen.writeAttribute(name.substring(options.attributePrefix.length), _)
+ }
+ case _ if name.startsWith(options.attributePrefix) && name !=
options.valueTag =>
+ gen.writeAttribute(name.substring(options.attributePrefix.length),
v.toString)
- def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt,
v) match {
- case (_, null) | (NullType, _) =>
writer.writeCharacters(options.nullValue)
- case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
- case (StringType, v: String) => writer.writeCharacters(v)
- case (TimestampType, v: Timestamp) =>
-
writer.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant()))
- case (TimestampType, v: Long) =>
- writer.writeCharacters(options.timestampFormatterInWrite.format(v))
- case (DateType, v: Int) =>
- writer.writeCharacters(options.dateFormatterInWrite.format(v))
- case (IntegerType, v: Int) => writer.writeCharacters(v.toString)
- case (ShortType, v: Short) => writer.writeCharacters(v.toString)
- case (FloatType, v: Float) => writer.writeCharacters(v.toString)
- case (DoubleType, v: Double) => writer.writeCharacters(v.toString)
- case (LongType, v: Long) => writer.writeCharacters(v.toString)
- case (DecimalType(), v: java.math.BigDecimal) =>
writer.writeCharacters(v.toString)
- case (DecimalType(), v: Decimal) => writer.writeCharacters(v.toString)
- case (ByteType, v: Byte) => writer.writeCharacters(v.toString)
- case (BooleanType, v: Boolean) => writer.writeCharacters(v.toString)
-
- // For the case roundtrip in reading and writing XML files,
[[ArrayType]] cannot have
- // [[ArrayType]] as element type. It always wraps the element with
[[StructType]]. So,
- // this case only can happen when we convert a normal [[DataFrame]] to
XML file.
- // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing
what is element name
- // for XML file.
+ // For ArrayType, we just need to write each as XML element.
case (ArrayType(ty, _), v: ArrayData) =>
(0 until v.numElements()).foreach { i =>
- writeChild(options.arrayElementName, ty, v.get(i, ty))
+ writeChildElement(name, ty, v.get(i, ty))
}
+ // For other datatypes, we just write normal elements.
+ case _ =>
+ writeChildElement(name, dt, v)
+ }
+ }
- case (MapType(_, vt, _), mv: Map[_, _]) =>
- val (attributes, elements) = mv.toSeq.partition { case (f, _) =>
- f.toString.startsWith(options.attributePrefix) && f.toString !=
options.valueTag
- }
- // We need to write attributes first before the value.
- (attributes ++ elements).foreach { case (k, v) =>
- writeChild(k.toString, vt, v)
- }
+ def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v)
match {
+ case (_, null) | (NullType, _) => gen.writeCharacters(options.nullValue)
+ case (StringType, v: UTF8String) => gen.writeCharacters(v.toString)
+ case (StringType, v: String) => gen.writeCharacters(v)
+ case (TimestampType, v: Timestamp) =>
+
gen.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant()))
+ case (TimestampType, v: Long) =>
+ gen.writeCharacters(options.timestampFormatterInWrite.format(v))
+ case (DateType, v: Int) =>
+ gen.writeCharacters(options.dateFormatterInWrite.format(v))
+ case (IntegerType, v: Int) => gen.writeCharacters(v.toString)
+ case (ShortType, v: Short) => gen.writeCharacters(v.toString)
+ case (FloatType, v: Float) => gen.writeCharacters(v.toString)
+ case (DoubleType, v: Double) => gen.writeCharacters(v.toString)
+ case (LongType, v: Long) => gen.writeCharacters(v.toString)
+ case (DecimalType(), v: java.math.BigDecimal) =>
gen.writeCharacters(v.toString)
+ case (DecimalType(), v: Decimal) => gen.writeCharacters(v.toString)
+ case (ByteType, v: Byte) => gen.writeCharacters(v.toString)
+ case (BooleanType, v: Boolean) => gen.writeCharacters(v.toString)
+
+ // For the case roundtrip in reading and writing XML files, [[ArrayType]]
cannot have
+ // [[ArrayType]] as element type. It always wraps the element with
[[StructType]]. So,
+ // this case only can happen when we convert a normal [[DataFrame]] to XML
file.
+ // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing what
is element name
+ // for XML file.
+ case (ArrayType(ty, _), v: ArrayData) =>
+ (0 until v.numElements()).foreach { i =>
+ writeChild(options.arrayElementName, ty, v.get(i, ty))
+ }
- case (mt: MapType, mv: MapData) => writeMapData(mt, mv)
+ case (MapType(_, vt, _), mv: Map[_, _]) =>
+ val (attributes, elements) = mv.toSeq.partition { case (f, _) =>
+ f.toString.startsWith(options.attributePrefix) && f.toString !=
options.valueTag
+ }
+ // We need to write attributes first before the value.
+ (attributes ++ elements).foreach { case (k, v) =>
+ writeChild(k.toString, vt, v)
Review Comment:
cc @sandip-db
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]