Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/21389#discussion_r198341944
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -202,4 +204,230 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
}
}
}
+
+ // Unsupported data types of csv, json, orc, and parquet are as follows;
+ // csv -> R/W: Interval, Null, Array, Map, Struct
+ // json -> W: Interval
+ // orc -> W: Interval, Null
+ // parquet -> R/W: Interval, Null
+ test("SPARK-24204 error handling for unsupported Array/Map/Struct types
- csv") {
+ withTempDir { dir =>
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ var msg = intercept[UnsupportedOperationException] {
+ Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a,
b)").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support
struct<a:int,b:string> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ val schema = StructType.fromDDL("a struct<b: Int>")
+ spark.range(1).write.mode("overwrite").csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getMessage
+ assert(msg.contains("CSV data source does not support struct<b:int>
data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Map("Tesla" -> 3))).toDF("id",
"cars").write.mode("overwrite").csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support
map<string,int> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ val schema = StructType.fromDDL("a map<int, int>")
+ spark.range(1).write.mode("overwrite").csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getMessage
+ assert(msg.contains("CSV data source does not support map<int,int>
data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
+ .write.mode("overwrite").csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<string>
data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ val schema = StructType.fromDDL("a array<int>")
+ spark.range(1).write.mode("overwrite").csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<int>
data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25,
4.25)))).toDF("id", "vectors")
+ .write.mode("overwrite").csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<double>
data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ val schema = StructType(StructField("a", new
UDT.MyDenseVectorUDT(), true) :: Nil)
+ spark.range(1).write.mode("overwrite").csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<double>
data type."))
+ }
+ }
+
+ test("SPARK-24204 error handling for unsupported Interval data types -
csv, json, parquet, orc") {
+ withTempDir { dir =>
+ val tempDir = new File(dir, "files").getCanonicalPath
+
+ Seq("orc", "json").foreach { format =>
+ // write path
+ var msg = intercept[AnalysisException] {
+ sql("select interval 1
days").write.format(format).mode("overwrite").save(tempDir)
+ }.getMessage
+ assert(msg.contains("Cannot save interval data type into external
storage."))
+
+ msg = intercept[UnsupportedOperationException] {
+ spark.udf.register("testType", () => new IntervalData())
+ sql("select
testType()").write.format(format).mode("overwrite").save(tempDir)
+ }.getMessage
+ assert(msg.toLowerCase(Locale.ROOT)
+ .contains(s"$format data source does not support
calendarinterval data type."))
+
+ // read path
+ // We expect the types below should be passed for
backward-compatibility
+
+ // Interval type
+ var schema = StructType(StructField("a", CalendarIntervalType,
true) :: Nil)
+ spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+ spark.read.schema(schema).format(format).load(tempDir).collect()
+
+ // UDT having interval data
+ schema = StructType(StructField("a", new IntervalUDT(), true) ::
Nil)
+ spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+ spark.read.schema(schema).format(format).load(tempDir).collect()
+ }
+ }
+
+ withTempDir { dir =>
+ val tempDir = new File(dir, "files").getCanonicalPath
+
+ Seq("parquet", "csv").foreach { format =>
--- End diff --
Nit: we can put all the write path together to reduce duplicated code
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]