[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107822881 --- Diff: docs/sql-programming-guide.md --- @@ -1223,6 +1223,13 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. + + +createTableColumnTypes + + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing. --- End diff -- Yeah, I see it is working internally. However, looks like this kind of types is not explicitly documented: http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types So I have a little concern regarding the description here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16209 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107728981 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,71 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + df: DataFrame, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) -schema.fields foreach { field => +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(df, _)) + .getOrElse(Map.empty[String, String]) +df.schema.fields.foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ = userSpecifiedColTypesMap +.getOrElse(field.name, getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes( +df: DataFrame, --- End diff -- Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107729028 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: test schemaString - from createTableColumnTypes option values") { +def testCreateTableColDataTypes(types: Seq[String]): Unit = { + val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) } + val schema = colTypes +.foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val createTableColTypes = +colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", ") + val df = spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema) + + val expectedSchemaStr = +colTypes.map { case (col, dataType) => s$col" $dataType """ }.mkString(", ") + + assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) == expectedSchemaStr) +} + +testCreateTableColDataTypes(Seq("boolean")) +testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", "bigint")) +testCreateTableColDataTypes(Seq("float", "double")) +testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)")) +testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)")) +testCreateTableColDataTypes(Seq("date", "timestamp")) +testCreateTableColDataTypes(Seq("binary")) + } + + test("SPARK-10849: create table using user specified column type and verify on target table") { +def testUserSpecifiedColTypes( + df: DataFrame, + createTableColTypes: String, + expectedTypes: Map[String, String]): Unit = { + df.write +.mode(SaveMode.Overwrite) +.option("createTableColumnTypes", createTableColTypes) +.jdbc(url1, "TEST.DBCOLTYPETEST", properties) + + // verify the data types of the created table by reading the database catalog of H2 + val query = +""" + |(SELECT column_name, type_name, character_maximum_length + | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST') +""".stripMargin + val rows = spark.read.jdbc(url1, query, properties).collect() + + rows.foreach { row => +val typeName = row.getString(1) +// For CHAR and VARCHAR, we also compare the max length +if (typeName.contains("CHAR")) { + val charMaxLength = row.getInt(2) + assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)") +} else { + assert(expectedTypes(row.getString(0)) == typeName) +} + } +} + +val data = Seq[Row](Row(1, "dave", "Boston", "electric cars")) --- End diff -- Forgot to delete the extra value. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107728867 --- Diff: docs/sql-programming-guide.md --- @@ -1223,6 +1223,13 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. + + +createTableColumnTypes + + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing. --- End diff -- VARCHAR(1024) is a valid data type in spark sql, it gets translated to String internally in Spark. The data types specified in this property are meant for target database, using VARCHAR for example because many RDBMS does not have String data type. Thank you for reviewing @viirya . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107611698 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: test schemaString - from createTableColumnTypes option values") { +def testCreateTableColDataTypes(types: Seq[String]): Unit = { + val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) } + val schema = colTypes +.foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val createTableColTypes = +colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", ") + val df = spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema) + + val expectedSchemaStr = +colTypes.map { case (col, dataType) => s$col" $dataType """ }.mkString(", ") + + assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) == expectedSchemaStr) +} + +testCreateTableColDataTypes(Seq("boolean")) +testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", "bigint")) +testCreateTableColDataTypes(Seq("float", "double")) +testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)")) +testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)")) +testCreateTableColDataTypes(Seq("date", "timestamp")) +testCreateTableColDataTypes(Seq("binary")) + } + + test("SPARK-10849: create table using user specified column type and verify on target table") { +def testUserSpecifiedColTypes( + df: DataFrame, + createTableColTypes: String, + expectedTypes: Map[String, String]): Unit = { + df.write +.mode(SaveMode.Overwrite) +.option("createTableColumnTypes", createTableColTypes) +.jdbc(url1, "TEST.DBCOLTYPETEST", properties) + + // verify the data types of the created table by reading the database catalog of H2 + val query = +""" + |(SELECT column_name, type_name, character_maximum_length + | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST') +""".stripMargin + val rows = spark.read.jdbc(url1, query, properties).collect() + + rows.foreach { row => +val typeName = row.getString(1) +// For CHAR and VARCHAR, we also compare the max length +if (typeName.contains("CHAR")) { + val charMaxLength = row.getInt(2) + assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)") +} else { + assert(expectedTypes(row.getString(0)) == typeName) +} + } +} + +val data = Seq[Row](Row(1, "dave", "Boston", "electric cars")) --- End diff -- Do I miss something? Looks like you have 4 columns. But the schema has only 3 fields? Is it intentional? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107607034 --- Diff: docs/sql-programming-guide.md --- @@ -1223,6 +1223,13 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. + + +createTableColumnTypes + + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing. --- End diff -- `The specified types should be valid spark sql data types`? What it means? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107606880 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,71 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + df: DataFrame, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) -schema.fields foreach { field => +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(df, _)) + .getOrElse(Map.empty[String, String]) +df.schema.fields.foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ = userSpecifiedColTypesMap +.getOrElse(field.name, getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes( +df: DataFrame, --- End diff -- wrong indent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107562733 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,63 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + schema: StructType, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .getOrElse(Map.empty[String, String]) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ: String = userSpecifiedColTypesMap.get(field.name) +.getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, +createTableColumnTypes: String): Map[String, String] = { +val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) +val userColNames = userSchema.fieldNames +// check duplicate columns in the user specified column types. +if (userColNames.distinct.length != userColNames.length) { + val duplicates = userColNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => x + }.mkString(", ") + throw new AnalysisException( +s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") +} +// check user specified column names exists in the data frame schema. +val commonNames = userColNames.intersect(schema.fieldNames) +if (commonNames.length != userColNames.length) { + val invalidColumns = userColNames.diff(commonNames).mkString(", ") + throw new AnalysisException( +s"Found invalid column(s) in createTableColumnTypes option value: $invalidColumns") +} + +// char/varchar gets translated to string type. Real data type specified by the user +// is available in the field metadata as HIVE_TYPE_STRING +userSchema.fields.map(f => + f.name -> { +(if (f.metadata.contains(HIVE_TYPE_STRING)) { + f.metadata.getString(HIVE_TYPE_STRING) +} else { + f.dataType.catalogString +}).toUpperCase --- End diff -- Done. Moved it to separate function. Thanks for the suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107562849 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: create table using user specified column type.") { +val data = Seq[Row]( + Row(1, "dave", "Boston", "electric cars"), + Row(2, "mary", "Seattle", "building planes") +) +val schema = StructType( + StructField("id", IntegerType) :: +StructField("first#name", StringType) :: +StructField("city", StringType) :: +StructField("descr", StringType) :: +Nil) +val df = spark.createDataFrame(sparkContext.parallelize(data), schema) +// Use database specific CHAR/VARCHAR types instead of String data type. +val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)" +assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == + sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , "descr" TEXT """) --- End diff -- Thanks for review @maropu . Fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user sureshthalamati commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107562605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,63 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + schema: StructType, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .getOrElse(Map.empty[String, String]) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ: String = userSpecifiedColTypesMap.get(field.name) +.getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, +createTableColumnTypes: String): Map[String, String] = { +val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) +val userColNames = userSchema.fieldNames +// check duplicate columns in the user specified column types. +if (userColNames.distinct.length != userColNames.length) { + val duplicates = userColNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => x + }.mkString(", ") + throw new AnalysisException( +s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") +} +// check user specified column names exists in the data frame schema. +val commonNames = userColNames.intersect(schema.fieldNames) --- End diff -- Thank you for the review. Good question., updated the PR with case-sensitive handling. Now column names from user specified schema are matched with data frame schema based on the SQLConf.CASE_SENSITIVE flag. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107091965 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: create table using user specified column type.") { +val data = Seq[Row]( + Row(1, "dave", "Boston", "electric cars"), + Row(2, "mary", "Seattle", "building planes") +) +val schema = StructType( + StructField("id", IntegerType) :: +StructField("first#name", StringType) :: +StructField("city", StringType) :: +StructField("descr", StringType) :: +Nil) +val df = spark.createDataFrame(sparkContext.parallelize(data), schema) +// Use database specific CHAR/VARCHAR types instead of String data type. +val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)" +assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == + sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , "descr" TEXT """) --- End diff -- nit: Drop interpolation `s` in the head. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107086057 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: create table using user specified column type.") { +val data = Seq[Row]( + Row(1, "dave", "Boston", "electric cars"), + Row(2, "mary", "Seattle", "building planes") +) +val schema = StructType( + StructField("id", IntegerType) :: +StructField("first#name", StringType) :: +StructField("city", StringType) :: +StructField("descr", StringType) :: +Nil) +val df = spark.createDataFrame(sparkContext.parallelize(data), schema) +// Use database specific CHAR/VARCHAR types instead of String data type. +val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)" +assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == + sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , "descr" TEXT """) --- End diff -- This is the case that users specify all the columns. In this case, we should mix the order of the columns. In addition, we also need a case that users only specify one/two columns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107084604 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,63 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + schema: StructType, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .getOrElse(Map.empty[String, String]) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ: String = userSpecifiedColTypesMap.get(field.name) +.getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, +createTableColumnTypes: String): Map[String, String] = { +val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) +val userColNames = userSchema.fieldNames +// check duplicate columns in the user specified column types. +if (userColNames.distinct.length != userColNames.length) { + val duplicates = userColNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => x + }.mkString(", ") + throw new AnalysisException( +s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") +} +// check user specified column names exists in the data frame schema. +val commonNames = userColNames.intersect(schema.fieldNames) +if (commonNames.length != userColNames.length) { + val invalidColumns = userColNames.diff(commonNames).mkString(", ") + throw new AnalysisException( +s"Found invalid column(s) in createTableColumnTypes option value: $invalidColumns") +} + +// char/varchar gets translated to string type. Real data type specified by the user +// is available in the field metadata as HIVE_TYPE_STRING +userSchema.fields.map(f => + f.name -> { +(if (f.metadata.contains(HIVE_TYPE_STRING)) { + f.metadata.getString(HIVE_TYPE_STRING) +} else { + f.dataType.catalogString +}).toUpperCase --- End diff -- We can create a partial function here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16209#discussion_r107084419 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -680,19 +681,63 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + schema: StructType, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) +val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .getOrElse(Map.empty[String, String]) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ: String = userSpecifiedColTypesMap.get(field.name) +.getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, +createTableColumnTypes: String): Map[String, String] = { +val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) +val userColNames = userSchema.fieldNames +// check duplicate columns in the user specified column types. +if (userColNames.distinct.length != userColNames.length) { + val duplicates = userColNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => x + }.mkString(", ") + throw new AnalysisException( +s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") +} +// check user specified column names exists in the data frame schema. +val commonNames = userColNames.intersect(schema.fieldNames) --- End diff -- Are we assuming the name comparison is always case sensitive? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org