sandip-db commented on code in PR #50517:
URL: https://github.com/apache/spark/pull/50517#discussion_r2030644453
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala:
##########
@@ -71,4 +76,38 @@ object DataSourceOptions {
// as a single VARIANT type column in the table with the given column name.
// E.g.
spark.read.format("<data-source-format>").option("singleVariantColumn",
"colName")
val SINGLE_VARIANT_COLUMN = "singleVariantColumn"
+ // The common option name for all data sources that supports corrupt record.
In case of a parsing
+ // error, the record will be stored as a string in the column with the given
name.
+ // Theoretically, the behavior of this option is not affected by the parsing
mode
+ // (PERMISSIVE/FAILFAST/DROPMALFORMED). However, the corrupt record is only
visible to the user
+ // when in PERMISSIVE mode, because the queries will fail in FAILFAST mode,
or the row containing
+ // the corrupt record will be dropped in DROPMALFORMED mode.
+ val COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord"
+
+ // When `singleVariantColumn` is enabled and there is a user-specified
schema, the schema must
+ // either be a variant field, or a variant field plus a corrupt column field.
+ def validateSingleVariantColumn(
+ options: CaseInsensitiveMap[String],
+ userSpecifiedSchema: Option[StructType]): Unit = {
+ (options.get(SINGLE_VARIANT_COLUMN), userSpecifiedSchema) match {
+ case (Some(col), Some(schema)) =>
+ var valid = schema.fields.exists { f =>
+ f.dataType.isInstanceOf[VariantType] && f.name == col && f.nullable
+ }
+ schema.length match {
+ case 1 =>
+ case 2 =>
+ val corruptRecordName = options.getOrElse(
Review Comment:
nit:
```suggestion
val corruptRecordColumnName = options.getOrElse(
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala:
##########
@@ -396,7 +396,7 @@ object CSVOptions extends DataSourceOptions {
val EMPTY_VALUE = newOption("emptyValue")
val LINE_SEP = newOption("lineSep")
val INPUT_BUFFER_SIZE = newOption("inputBufferSize")
- val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord")
+ val COLUMN_NAME_OF_CORRUPT_RECORD =
newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)
Review Comment:
Add from_csv and csv file reader test with corrupt column and variant
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala:
##########
@@ -4007,6 +4007,7 @@ abstract class JsonSuite
"true",
"""{"a": [], "b": null}""",
"""{"a": 1}""",
+ "bad json",
Review Comment:
Add from_json test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]