sandip-db commented on code in PR #44875:
URL: https://github.com/apache/spark/pull/44875#discussion_r1467238788
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -354,74 +394,109 @@ class StaxXmlParser(
rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
val row = new Array[Any](schema.length)
val nameToIndex = getFieldNameToIndex(schema)
+ var badRecordException: Option[Throwable] = None
// If there are attributes, then we process them first.
- convertAttributes(rootAttributes, schema).toSeq.foreach {
- case (f, v) =>
- nameToIndex.get(f).foreach { row(_) = v }
+ try {
+ convertAttributes(rootAttributes, schema).toSeq.foreach {
+ case (f, v) =>
+ nameToIndex.get(f).foreach { row(_) = v }
+ }
+ } catch {
+ case NonFatal(e) =>
+ badRecordException = badRecordException.orElse(Some(e))
}
val wildcardColName = options.wildcardColName
val hasWildcard = schema.exists(_.name == wildcardColName)
- var badRecordException: Option[Throwable] = None
-
var shouldStop = false
while (!shouldStop) {
parser.nextEvent match {
- case e: StartElement => try {
- val attributes = e.getAttributes.asScala.toArray
+ case e: StartElement =>
val field = StaxXmlParserUtils.getName(e.asStartElement.getName,
options)
-
- nameToIndex.get(field) match {
- case Some(index) => schema(index).dataType match {
- case st: StructType =>
- row(index) = convertObjectWithAttributes(parser, st, field,
attributes)
-
- case ArrayType(dt: DataType, _) =>
- val values = Option(row(index))
- .map(_.asInstanceOf[ArrayBuffer[Any]])
- .getOrElse(ArrayBuffer.empty[Any])
- val newValue = dt match {
- case st: StructType =>
- convertObjectWithAttributes(parser, st, field, attributes)
- case dt: DataType =>
- convertField(parser, dt, field)
+ try {
+ val attributes = e.getAttributes.asScala.toArray
+ nameToIndex.get(field) match {
+ case Some(index) =>
+ schema(index).dataType match {
+ case st: StructType =>
+ try {
+ row(index) = convertObjectWithAttributes(parser, st,
field, attributes)
+ } catch {
+ case partialValueException: PartialValueException =>
+ badRecordException =
+
badRecordException.orElse(Some(partialValueException.cause))
+ row.update(index,
partialValueException.partialResult)
+ }
+
+
Review Comment:
```suggestion
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -354,74 +394,109 @@ class StaxXmlParser(
rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
val row = new Array[Any](schema.length)
val nameToIndex = getFieldNameToIndex(schema)
+ var badRecordException: Option[Throwable] = None
// If there are attributes, then we process them first.
- convertAttributes(rootAttributes, schema).toSeq.foreach {
- case (f, v) =>
- nameToIndex.get(f).foreach { row(_) = v }
+ try {
+ convertAttributes(rootAttributes, schema).toSeq.foreach {
Review Comment:
If the first attribute has type mismatch, `convertAttributes ` will drop the
subsequent ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]