sandip-db commented on code in PR #50052:
URL: https://github.com/apache/spark/pull/50052#discussion_r1970795762


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -473,4 +523,21 @@ private[sql] object UnivocityParser {
       parser.options.columnNameOfCorruptRecord)
     filteredLines.flatMap(safeParser.parse)
   }
+
+  def appendCsvColumnToVariant(builder: VariantBuilder, s: String, nullValue: 
String): Unit = {
+    if (s == null || s == nullValue) {
+      builder.appendNull()
+      return
+    }
+    scala.util.Try(s.toLong) match {
+      case scala.util.Success(l) if l.toString == s =>
+        builder.appendLong(l)
+        return
+      case _ =>
+    }
+    if (builder.tryParseDecimal(s)) {
+      return
+    }
+    builder.appendString(s)

Review Comment:
   What about other data types (double, etc)?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,6 +182,8 @@ trait FileFormat {
    */
   def supportDataType(dataType: DataType): Boolean = true
 
+  def supportDataTypeReadOnly(dataType: DataType): Boolean = 
supportDataType(dataType)

Review Comment:
   nit:
   ```suggestion
     def supportReadDataType(dataType: DataType): Boolean = 
supportDataType(dataType)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##########
@@ -759,14 +759,6 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
         "sqlExpr" -> "\"to_csv(value)\""),
       context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
     )
-
-    checkError(
-      exception = intercept[SparkUnsupportedOperationException] {
-        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()

Review Comment:
   Add a test to validate that `from_csv` supports Variant



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
     val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
+
+  test("csv with variant") {
+    withTempPath { path =>
+      val data =
+        """field 1,field2
+          |1.1,1e9
+          |,"hello
+          |world",true
+          |""".stripMargin
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+      def checkSingleVariant(options: Map[String, String], expected: String*): 
Unit = {
+        val allOptions = options ++ Map("singleVariantColumn" -> "v")
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as 
string)"),
+          expected.map(Row(_))
+        )
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("count(*)"),
+          Row(expected.length)
+        )

Review Comment:
   This seems redundant after the above `checkAnswer`.



##########
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##########
@@ -734,7 +734,7 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(actual, Row("-"))
   }
 
-  test("SPARK-47497: from_csv/to_csv does not support VariantType data") {
+  test("SPARK-47497: to_csv does not support VariantType data") {

Review Comment:
   What are the plans to add variant support to `to_csv`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -161,9 +164,12 @@ class UnivocityParser(
   // Each input token is placed in each output row's position by mapping 
these. In this case,
   //
   //   output row - ["A", 2]
-  private val valueConverters: Array[ValueConverter] = {
-    requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
-  }
+  private val valueConverters: Array[ValueConverter] =
+    if (options.singleVariantColumn.isDefined) {
+      null
+    } else {
+      requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
+    }

Review Comment:
   nit: 
   ```suggestion
     lazy val valueConverters: Array[ValueConverter] = {
         requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
       }
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
     val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
+
+  test("csv with variant") {
+    withTempPath { path =>
+      val data =
+        """field 1,field2
+          |1.1,1e9
+          |,"hello
+          |world",true
+          |""".stripMargin
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+      def checkSingleVariant(options: Map[String, String], expected: String*): 
Unit = {
+        val allOptions = options ++ Map("singleVariantColumn" -> "v")
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as 
string)"),
+          expected.map(Row(_))
+        )
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("count(*)"),
+          Row(expected.length)
+        )
+      }
+
+      checkSingleVariant(Map(),
+        """{"_c0":"field 1","_c1":"field2"}""",
+        """{"_c0":1.1,"_c1":"1e9"}""",
+        """{"_c0":null,"_c1":"hello"}""",
+        """{"_c0":"world\"","_c1":"true"}""")
+
+      checkSingleVariant(Map("header" -> "true"),
+        """{"field 1":1.1,"field2":"1e9"}""",
+        """{"field 1":null,"field2":"hello"}""",
+        """{"field 1":"world\"","field2":"true"}""")
+
+      checkSingleVariant(Map("multiLine" -> "true"),
+        """{"_c0":"field 1","_c1":"field2"}""",
+        """{"_c0":1.1,"_c1":"1e9"}""",
+        """{"_c0":null,"_c1":"hello\nworld","_c2":"true"}""")
+
+      checkSingleVariant(Map("multiLine" -> "true", "header" -> "true"),
+        """{"field 1":1.1,"field2":"1e9"}""",
+        """{"field 1":null,"field2":"hello\nworld"}""")
+
+      checkError(
+        exception = intercept[SparkException] {
+          val options = Map("singleVariantColumn" -> "v", "multiLine" -> 
"true", "header" -> "true",
+            "mode" -> "failfast")
+          spark.read.options(options).csv(path.getCanonicalPath).collect()
+        }.getCause.asInstanceOf[SparkException],
+        condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
+        parameters = Map("badRecord" -> """[{"field 
1":null,"field2":"hello\nworld"}]""",
+          "failFastMode" -> "FAILFAST")
+      )
+
+      def checkSchema(options: Map[String, String], expected: (String, 
String)*): Unit = {
+        checkAnswer(
+          spark.read.options(options).schema("`field 1` variant, field2 
variant")

Review Comment:
   Add a test with a variant field in the schema,  but absent in the csv file



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -48,6 +48,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     val csvDataSource = CSVDataSource(parsedOptions)
+    !parsedOptions.needHeaderForSingleVariantColumn &&

Review Comment:
   This will have perf impact for large files. Can't you read header even for 
non-first file-split in single line mode?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
     val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
+
+  test("csv with variant") {
+    withTempPath { path =>
+      val data =
+        """field 1,field2
+          |1.1,1e9
+          |,"hello
+          |world",true
+          |""".stripMargin
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+      def checkSingleVariant(options: Map[String, String], expected: String*): 
Unit = {
+        val allOptions = options ++ Map("singleVariantColumn" -> "v")
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as 
string)"),

Review Comment:
   The test doesn't verify that VariantBuilder inferred the type correctly.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -68,6 +69,11 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    dataSchema.foreach { field =>
+      if (!supportDataType(field.dataType, allowVariant = false)) {
+        throw 
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError("CSV", field)

Review Comment:
   Wfat is the plan to support CSV write?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -68,6 +69,11 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    dataSchema.foreach { field =>
+      if (!supportDataType(field.dataType, allowVariant = false)) {
+        throw 
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError("CSV", field)

Review Comment:
   test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to