Github user sureshthalamati commented on a diff in the pull request:
https://github.com/apache/spark/pull/16209#discussion_r107729028
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with
BeforeAndAfter {
assert(sql("select * from people_view").count() == 2)
}
}
+
+ test("SPARK-10849: test schemaString - from createTableColumnTypes
option values") {
+ def testCreateTableColDataTypes(types: Seq[String]): Unit = {
+ val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t)
}
+ val schema = colTypes
+ .foldLeft(new StructType())((schema, colType) =>
schema.add(colType._1, colType._2))
+ val createTableColTypes =
+ colTypes.map { case (col, dataType) => s"$col $dataType"
}.mkString(", ")
+ val df =
spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema)
+
+ val expectedSchemaStr =
+ colTypes.map { case (col, dataType) => s""""$col" $dataType """
}.mkString(", ")
+
+ assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes))
== expectedSchemaStr)
+ }
+
+ testCreateTableColDataTypes(Seq("boolean"))
+ testCreateTableColDataTypes(Seq("tinyint", "smallint", "int",
"bigint"))
+ testCreateTableColDataTypes(Seq("float", "double"))
+ testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)"))
+ testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)"))
+ testCreateTableColDataTypes(Seq("date", "timestamp"))
+ testCreateTableColDataTypes(Seq("binary"))
+ }
+
+ test("SPARK-10849: create table using user specified column type and
verify on target table") {
+ def testUserSpecifiedColTypes(
+ df: DataFrame,
+ createTableColTypes: String,
+ expectedTypes: Map[String, String]): Unit = {
+ df.write
+ .mode(SaveMode.Overwrite)
+ .option("createTableColumnTypes", createTableColTypes)
+ .jdbc(url1, "TEST.DBCOLTYPETEST", properties)
+
+ // verify the data types of the created table by reading the
database catalog of H2
+ val query =
+ """
+ |(SELECT column_name, type_name, character_maximum_length
+ | FROM information_schema.columns WHERE table_name =
'DBCOLTYPETEST')
+ """.stripMargin
+ val rows = spark.read.jdbc(url1, query, properties).collect()
+
+ rows.foreach { row =>
+ val typeName = row.getString(1)
+ // For CHAR and VARCHAR, we also compare the max length
+ if (typeName.contains("CHAR")) {
+ val charMaxLength = row.getInt(2)
+ assert(expectedTypes(row.getString(0)) ==
s"$typeName($charMaxLength)")
+ } else {
+ assert(expectedTypes(row.getString(0)) == typeName)
+ }
+ }
+ }
+
+ val data = Seq[Row](Row(1, "dave", "Boston", "electric cars"))
--- End diff --
Forgot to delete the extra value. Will fix it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]