sandip-db commented on code in PR #44875:
URL: https://github.com/apache/spark/pull/44875#discussion_r1465717275
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -280,20 +267,45 @@ class StaxXmlParser(
while (!shouldStop) {
parser.nextEvent match {
case e: StartElement =>
- kvPairs +=
-
(UTF8String.fromString(StaxXmlParserUtils.getName(e.asStartElement.getName,
options)) ->
- convertField(parser, valueType))
+ val key = UTF8String.fromString(
+ StaxXmlParserUtils.getName(e.asStartElement.getName, options))
+ try {
+ kvPairs += key -> convertField(parser, valueType)
+ } catch {
+ case partialValueException: PartialValueException =>
+ badRecordException =
badRecordException.orElse(Some(partialValueException.cause))
+ StaxXmlParserUtils.skipChildren(parser)
+ kvPairs += key -> partialValueException.partialResult
+ case NonFatal(e) =>
+ badRecordException = badRecordException.orElse(Some(e))
+ StaxXmlParserUtils.skipChildren(parser)
+ }
case c: Characters if !c.isWhiteSpace =>
// Create a value tag field for it
- kvPairs +=
+ val key = UTF8String.fromString(options.valueTag)
// TODO: We don't support an array value tags in map yet.
- (UTF8String.fromString(options.valueTag) -> convertTo(c.getData,
valueType))
+ try {
+ kvPairs += (key -> convertTo(c.getData, valueType))
+ } catch {
+ case partialValueException: PartialValueException =>
Review Comment:
`convertTo` is not going to throw `PartialValueException`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -385,31 +397,41 @@ class StaxXmlParser(
val attributes = e.getAttributes.asScala.toArray
val field = StaxXmlParserUtils.getName(e.asStartElement.getName,
options)
- nameToIndex.get(field) match {
- case Some(index) => schema(index).dataType match {
- case st: StructType =>
- row(index) = convertObjectWithAttributes(parser, st,
attributes)
-
- case ArrayType(dt: DataType, _) =>
- val values = Option(row(index))
- .map(_.asInstanceOf[ArrayBuffer[Any]])
- .getOrElse(ArrayBuffer.empty[Any])
- val newValue = dt match {
- case st: StructType =>
- convertObjectWithAttributes(parser, st, attributes)
- case dt: DataType =>
- convertField(parser, dt)
+ nameToIndex.get(field) match {
+ case Some(index) =>
+ try {
+ schema(index).dataType match {
+ case st: StructType =>
+ row(index) = convertObjectWithAttributes(parser, st,
attributes)
+
+ case ArrayType(dt: DataType, _) =>
+ val values = Option(row(index))
+ .map(_.asInstanceOf[ArrayBuffer[Any]])
+ .getOrElse(ArrayBuffer.empty[Any])
+ val newValue = dt match {
+ case st: StructType =>
+ convertObjectWithAttributes(parser, st, attributes)
+ case dt: DataType =>
+ convertField(parser, dt)
+ }
+ row(index) = values :+ newValue
+
+ case dt: DataType =>
+ row(index) = convertField(parser, dt, attributes)
+ }
+ } catch {
+ case partialResultException: PartialValueException =>
Review Comment:
nit:
```suggestion
case partialValueException: PartialValueException =>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
+ s"""<ROW>
+ | <double>0.1</double>
Review Comment:
Add integer `valueTag`:
```suggestion
| 3
| <double>0.1</double>
| 4
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
+ s"""<ROW>
+ | <double>0.1</double>
+ | <array>0</array>
+ | <array>mismatch</array>
+ | <array>2</array>
+ | <map>
+ | <key1>1</key1>
+ | <key2>2</key2>
+ | </map>
+ |</ROW>""".stripMargin
+ val xmlBadRecord2 =
+ s"""<ROW>
+ | <double>mismatch</double>
Review Comment:
```suggestion
| 3
| <double>mismatch</double>
| mismatchValue
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
+ s"""<ROW>
+ | <double>0.1</double>
+ | <array>0</array>
+ | <array>mismatch</array>
+ | <array>2</array>
+ | <map>
+ | <key1>1</key1>
+ | <key2>2</key2>
+ | </map>
+ |</ROW>""".stripMargin
+ val xmlBadRecord2 =
+ s"""<ROW>
+ | <double>mismatch</double>
+ | <array>mismatch</array>
+ | <array>1</array>
+ | <array>2</array>
+ | <map>
+ | <key1>mismatch</key1>
+ | <key2>2</key2>
+ | </map>
+ |</ROW>""".stripMargin
+ Files.write(new File(dir, "f0").toPath, (xmlBadRecord1 ++
xmlBadRecord2).getBytes)
+ val schema = "double double, array array<int>, map map<string, int>,
_corrupt_record string"
Review Comment:
```suggestion
val schema = "_VALUE array<int>, double double, array array<int>, map
map<string, int>, _corrupt_record string"
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
+ s"""<ROW>
+ | <double>0.1</double>
+ | <array>0</array>
+ | <array>mismatch</array>
+ | <array>2</array>
+ | <map>
+ | <key1>1</key1>
+ | <key2>2</key2>
Review Comment:
Add a valueTag in the map:
```suggestion
| <key1>1</key1>
| 3
| <key2>2</key2>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
Review Comment:
Add a scenario with nested struct object with attributes and valueTag:
Correct data:
`<ROW><struct attr=3.0>4.0<b>5.0</b><struct></ROW>`
XML data with mismatches:
```
<ROW><struct attr=mismatch1>4.0<b>5.0</b><struct></ROW>
<ROW><struct attr=3.0>mismatch2<b>5.0</b><struct></ROW>
<ROW><struct attr=3.0>4.0<b>mismatch3</b><struct></ROW>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2685,4 +2684,40 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
)
}
+
+ test("return partial results for bad records") {
+ withTempDir { dir =>
+ val xmlBadRecord1 =
+ s"""<ROW>
+ | <double>0.1</double>
+ | <array>0</array>
+ | <array>mismatch</array>
+ | <array>2</array>
+ | <map>
+ | <key1>1</key1>
+ | <key2>2</key2>
+ | </map>
+ |</ROW>""".stripMargin
+ val xmlBadRecord2 =
+ s"""<ROW>
+ | <double>mismatch</double>
+ | <array>mismatch</array>
+ | <array>1</array>
+ | <array>2</array>
+ | <map>
+ | <key1>mismatch</key1>
Review Comment:
```suggestion
| <map>
| mismatch1
| <key1>mismatch</key1>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -21,17 +21,14 @@ import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDateTime}
import java.util.TimeZone
-
Review Comment:
nit: revert
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -21,17 +21,14 @@ import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDateTime}
import java.util.TimeZone
-
import scala.collection.immutable.ArraySeq
import scala.collection.mutable
import scala.io.Source
import scala.jdk.CollectionConverters._
-
Review Comment:
nit: revert
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -429,6 +451,7 @@ class StaxXmlParser(
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
+ StaxXmlParserUtils.skipChildren(parser)
Review Comment:
Will this skip valid entities?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -21,17 +21,14 @@ import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDateTime}
import java.util.TimeZone
-
import scala.collection.immutable.ArraySeq
import scala.collection.mutable
import scala.io.Source
import scala.jdk.CollectionConverters._
-
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.io.compress.GzipCodec
-
Review Comment:
nit: revert
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala:
##########
@@ -280,20 +267,45 @@ class StaxXmlParser(
while (!shouldStop) {
parser.nextEvent match {
case e: StartElement =>
- kvPairs +=
-
(UTF8String.fromString(StaxXmlParserUtils.getName(e.asStartElement.getName,
options)) ->
- convertField(parser, valueType))
+ val key = UTF8String.fromString(
+ StaxXmlParserUtils.getName(e.asStartElement.getName, options))
+ try {
+ kvPairs += key -> convertField(parser, valueType)
+ } catch {
+ case partialValueException: PartialValueException =>
+ badRecordException =
badRecordException.orElse(Some(partialValueException.cause))
+ StaxXmlParserUtils.skipChildren(parser)
+ kvPairs += key -> partialValueException.partialResult
+ case NonFatal(e) =>
+ badRecordException = badRecordException.orElse(Some(e))
+ StaxXmlParserUtils.skipChildren(parser)
+ }
case c: Characters if !c.isWhiteSpace =>
// Create a value tag field for it
- kvPairs +=
+ val key = UTF8String.fromString(options.valueTag)
// TODO: We don't support an array value tags in map yet.
- (UTF8String.fromString(options.valueTag) -> convertTo(c.getData,
valueType))
+ try {
+ kvPairs += (key -> convertTo(c.getData, valueType))
+ } catch {
+ case partialValueException: PartialValueException =>
+ badRecordException =
badRecordException.orElse(Some(partialValueException.cause))
+ StaxXmlParserUtils.skipChildren(parser)
+ kvPairs += key -> partialValueException.partialResult
+ case NonFatal(e) =>
+ badRecordException = badRecordException.orElse(Some(e))
+ StaxXmlParserUtils.skipChildren(parser)
Review Comment:
Will this skip other valid <key, value> pairs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]