chenhao-db commented on code in PR #50052:
URL: https://github.com/apache/spark/pull/50052#discussion_r1976506479
##########
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##########
@@ -759,14 +759,6 @@ class CsvFunctionsSuite extends QueryTest with
SharedSparkSession {
"sqlExpr" -> "\"to_csv(value)\""),
context = ExpectedContext(fragment = "to_csv",
getCurrentClassCallSitePattern)
)
-
- checkError(
- exception = intercept[SparkUnsupportedOperationException] {
- df.select(from_csv(lit("data"), valueSchema, Map.empty[String,
String])).collect()
Review Comment:
Done.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -68,6 +69,11 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
+ dataSchema.foreach { field =>
+ if (!supportDataType(field.dataType, allowVariant = false)) {
+ throw
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError("CSV", field)
Review Comment:
The plan is to never support it.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
val textParsingException =
malformedCSVException.getCause.asInstanceOf[TextParsingException]
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
+
+ test("csv with variant") {
+ withTempPath { path =>
+ val data =
+ """field 1,field2
+ |1.1,1e9
+ |,"hello
+ |world",true
+ |""".stripMargin
+ Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+ def checkSingleVariant(options: Map[String, String], expected: String*):
Unit = {
+ val allOptions = options ++ Map("singleVariantColumn" -> "v")
+ checkAnswer(
+
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as
string)"),
+ expected.map(Row(_))
+ )
+ checkAnswer(
+
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("count(*)"),
+ Row(expected.length)
+ )
+ }
+
+ checkSingleVariant(Map(),
+ """{"_c0":"field 1","_c1":"field2"}""",
+ """{"_c0":1.1,"_c1":"1e9"}""",
+ """{"_c0":null,"_c1":"hello"}""",
+ """{"_c0":"world\"","_c1":"true"}""")
+
+ checkSingleVariant(Map("header" -> "true"),
+ """{"field 1":1.1,"field2":"1e9"}""",
+ """{"field 1":null,"field2":"hello"}""",
+ """{"field 1":"world\"","field2":"true"}""")
+
+ checkSingleVariant(Map("multiLine" -> "true"),
+ """{"_c0":"field 1","_c1":"field2"}""",
+ """{"_c0":1.1,"_c1":"1e9"}""",
+ """{"_c0":null,"_c1":"hello\nworld","_c2":"true"}""")
+
+ checkSingleVariant(Map("multiLine" -> "true", "header" -> "true"),
+ """{"field 1":1.1,"field2":"1e9"}""",
+ """{"field 1":null,"field2":"hello\nworld"}""")
+
+ checkError(
+ exception = intercept[SparkException] {
+ val options = Map("singleVariantColumn" -> "v", "multiLine" ->
"true", "header" -> "true",
+ "mode" -> "failfast")
+ spark.read.options(options).csv(path.getCanonicalPath).collect()
+ }.getCause.asInstanceOf[SparkException],
+ condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
+ parameters = Map("badRecord" -> """[{"field
1":null,"field2":"hello\nworld"}]""",
+ "failFastMode" -> "FAILFAST")
+ )
+
+ def checkSchema(options: Map[String, String], expected: (String,
String)*): Unit = {
+ checkAnswer(
+ spark.read.options(options).schema("`field 1` variant, field2
variant")
Review Comment:
Existing test cases with null results can validate this.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
val textParsingException =
malformedCSVException.getCause.asInstanceOf[TextParsingException]
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
+
+ test("csv with variant") {
+ withTempPath { path =>
+ val data =
+ """field 1,field2
+ |1.1,1e9
+ |,"hello
+ |world",true
+ |""".stripMargin
+ Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+ def checkSingleVariant(options: Map[String, String], expected: String*):
Unit = {
+ val allOptions = options ++ Map("singleVariantColumn" -> "v")
+ checkAnswer(
+
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as
string)"),
Review Comment:
The to-string result is already enough for validation.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -48,6 +48,7 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone)
val csvDataSource = CSVDataSource(parsedOptions)
+ !parsedOptions.needHeaderForSingleVariantColumn &&
Review Comment:
It is not very simple to implement. The current way to split a file when
`header` is true is to avoid checking header when not at the start of the file.
However, when we have both `singleVariantColumn` and `header`, we need to read
the start line of the file, which is not straightforward in the current API
(for example, `parseIterator` doesn't know anything about the "file", it can
only see certain lines). This could be a future optimization.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -161,9 +164,12 @@ class UnivocityParser(
// Each input token is placed in each output row's position by mapping
these. In this case,
//
// output row - ["A", 2]
- private val valueConverters: Array[ValueConverter] = {
- requiredSchema.map(f => makeConverter(f.name, f.dataType,
f.nullable)).toArray
- }
+ private val valueConverters: Array[ValueConverter] =
+ if (options.singleVariantColumn.isDefined) {
+ null
+ } else {
+ requiredSchema.map(f => makeConverter(f.name, f.dataType,
f.nullable)).toArray
+ }
Review Comment:
I tend to keep my current code. `lazy` is not free.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]