[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107822881
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1223,6 +1223,13 @@ the following case-insensitive options:
  This is a JDBC writer related option. If specified, this option 
allows setting of database-specific table and partition options when creating a 
table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This 
option applies only to writing.

   
+  
+  
+createTableColumnTypes
+
+ The database column data types to use instead of the defaults, when 
creating the table. Data type information should be specified in the same 
format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments 
VARCHAR(1024)"). The specified types should be valid spark sql data 
types. This option applies only to writing.
--- End diff --

Yeah, I see it is working internally. However, looks like this kind of 
types is not explicitly documented: 
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

So I have a little concern regarding the description here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16209


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107728981
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,71 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  df: DataFrame,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
-schema.fields foreach { field =>
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(df, _))
+  .getOrElse(Map.empty[String, String])
+df.schema.fields.foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ = userSpecifiedColTypesMap
+.getOrElse(field.name, getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(
+df: DataFrame,
--- End diff --

Will fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107729028
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
   assert(sql("select * from people_view").count() == 2)
 }
   }
+
+  test("SPARK-10849: test schemaString - from createTableColumnTypes 
option values") {
+def testCreateTableColDataTypes(types: Seq[String]): Unit = {
+  val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) 
}
+  val schema = colTypes
+.foldLeft(new StructType())((schema, colType) => 
schema.add(colType._1, colType._2))
+  val createTableColTypes =
+colTypes.map { case (col, dataType) => s"$col $dataType" 
}.mkString(", ")
+  val df = 
spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema)
+
+  val expectedSchemaStr =
+colTypes.map { case (col, dataType) => s$col" $dataType """ 
}.mkString(", ")
+
+  assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) 
== expectedSchemaStr)
+}
+
+testCreateTableColDataTypes(Seq("boolean"))
+testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", 
"bigint"))
+testCreateTableColDataTypes(Seq("float", "double"))
+testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)"))
+testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)"))
+testCreateTableColDataTypes(Seq("date", "timestamp"))
+testCreateTableColDataTypes(Seq("binary"))
+  }
+
+  test("SPARK-10849: create table using user specified column type and 
verify on target table") {
+def testUserSpecifiedColTypes(
+  df: DataFrame,
+  createTableColTypes: String,
+  expectedTypes: Map[String, String]): Unit = {
+  df.write
+.mode(SaveMode.Overwrite)
+.option("createTableColumnTypes", createTableColTypes)
+.jdbc(url1, "TEST.DBCOLTYPETEST", properties)
+
+  // verify the data types of the created table by reading the 
database catalog of H2
+  val query =
+"""
+  |(SELECT column_name, type_name, character_maximum_length
+  | FROM information_schema.columns WHERE table_name = 
'DBCOLTYPETEST')
+""".stripMargin
+  val rows = spark.read.jdbc(url1, query, properties).collect()
+
+  rows.foreach { row =>
+val typeName = row.getString(1)
+// For CHAR and VARCHAR, we also compare the max length
+if (typeName.contains("CHAR")) {
+  val charMaxLength = row.getInt(2)
+  assert(expectedTypes(row.getString(0)) == 
s"$typeName($charMaxLength)")
+} else {
+  assert(expectedTypes(row.getString(0)) == typeName)
+}
+  }
+}
+
+val data = Seq[Row](Row(1, "dave", "Boston", "electric cars"))
--- End diff --

Forgot to delete the extra value. Will fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107728867
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1223,6 +1223,13 @@ the following case-insensitive options:
  This is a JDBC writer related option. If specified, this option 
allows setting of database-specific table and partition options when creating a 
table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This 
option applies only to writing.

   
+  
+  
+createTableColumnTypes
+
+ The database column data types to use instead of the defaults, when 
creating the table. Data type information should be specified in the same 
format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments 
VARCHAR(1024)"). The specified types should be valid spark sql data 
types. This option applies only to writing.
--- End diff --

VARCHAR(1024) is a valid data type in spark sql, it gets translated to 
String internally in Spark.  The data types specified in this property are 
meant for target database, using VARCHAR for example because many RDBMS does 
not have String data type.

Thank you for reviewing @viirya .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107611698
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,147 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
   assert(sql("select * from people_view").count() == 2)
 }
   }
+
+  test("SPARK-10849: test schemaString - from createTableColumnTypes 
option values") {
+def testCreateTableColDataTypes(types: Seq[String]): Unit = {
+  val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) 
}
+  val schema = colTypes
+.foldLeft(new StructType())((schema, colType) => 
schema.add(colType._1, colType._2))
+  val createTableColTypes =
+colTypes.map { case (col, dataType) => s"$col $dataType" 
}.mkString(", ")
+  val df = 
spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema)
+
+  val expectedSchemaStr =
+colTypes.map { case (col, dataType) => s$col" $dataType """ 
}.mkString(", ")
+
+  assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) 
== expectedSchemaStr)
+}
+
+testCreateTableColDataTypes(Seq("boolean"))
+testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", 
"bigint"))
+testCreateTableColDataTypes(Seq("float", "double"))
+testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)"))
+testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)"))
+testCreateTableColDataTypes(Seq("date", "timestamp"))
+testCreateTableColDataTypes(Seq("binary"))
+  }
+
+  test("SPARK-10849: create table using user specified column type and 
verify on target table") {
+def testUserSpecifiedColTypes(
+  df: DataFrame,
+  createTableColTypes: String,
+  expectedTypes: Map[String, String]): Unit = {
+  df.write
+.mode(SaveMode.Overwrite)
+.option("createTableColumnTypes", createTableColTypes)
+.jdbc(url1, "TEST.DBCOLTYPETEST", properties)
+
+  // verify the data types of the created table by reading the 
database catalog of H2
+  val query =
+"""
+  |(SELECT column_name, type_name, character_maximum_length
+  | FROM information_schema.columns WHERE table_name = 
'DBCOLTYPETEST')
+""".stripMargin
+  val rows = spark.read.jdbc(url1, query, properties).collect()
+
+  rows.foreach { row =>
+val typeName = row.getString(1)
+// For CHAR and VARCHAR, we also compare the max length
+if (typeName.contains("CHAR")) {
+  val charMaxLength = row.getInt(2)
+  assert(expectedTypes(row.getString(0)) == 
s"$typeName($charMaxLength)")
+} else {
+  assert(expectedTypes(row.getString(0)) == typeName)
+}
+  }
+}
+
+val data = Seq[Row](Row(1, "dave", "Boston", "electric cars"))
--- End diff --

Do I miss something? Looks like you have 4 columns. But the schema has only 
3 fields? Is it intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107607034
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1223,6 +1223,13 @@ the following case-insensitive options:
  This is a JDBC writer related option. If specified, this option 
allows setting of database-specific table and partition options when creating a 
table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This 
option applies only to writing.

   
+  
+  
+createTableColumnTypes
+
+ The database column data types to use instead of the defaults, when 
creating the table. Data type information should be specified in the same 
format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments 
VARCHAR(1024)"). The specified types should be valid spark sql data 
types. This option applies only to writing.
--- End diff --

`The specified types should be valid spark sql data types`? What it means?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107606880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,71 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  df: DataFrame,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
-schema.fields foreach { field =>
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(df, _))
+  .getOrElse(Map.empty[String, String])
+df.schema.fields.foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ = userSpecifiedColTypesMap
+.getOrElse(field.name, getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(
+df: DataFrame,
--- End diff --

wrong indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-22 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107562733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,63 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  schema: StructType,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(schema, _))
+  .getOrElse(Map.empty[String, String])
 schema.fields foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ: String = userSpecifiedColTypesMap.get(field.name)
+.getOrElse(getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType,
+createTableColumnTypes: String): Map[String, String] = {
+val userSchema = 
CatalystSqlParser.parseTableSchema(createTableColumnTypes)
+val userColNames = userSchema.fieldNames
+// check duplicate columns in the user specified column types.
+if (userColNames.distinct.length != userColNames.length) {
+  val duplicates = userColNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => x
+  }.mkString(", ")
+  throw new AnalysisException(
+s"Found duplicate column(s) in createTableColumnTypes option 
value: $duplicates")
+}
+// check user specified column names exists in the data frame schema.
+val commonNames = userColNames.intersect(schema.fieldNames)
+if (commonNames.length != userColNames.length) {
+  val invalidColumns = userColNames.diff(commonNames).mkString(", ")
+  throw new AnalysisException(
+s"Found invalid column(s) in createTableColumnTypes option value: 
$invalidColumns")
+}
+
+// char/varchar gets translated to string type. Real data type 
specified by the user
+// is available in the field metadata as HIVE_TYPE_STRING
+userSchema.fields.map(f =>
+  f.name -> {
+(if (f.metadata.contains(HIVE_TYPE_STRING)) {
+  f.metadata.getString(HIVE_TYPE_STRING)
+} else {
+  f.dataType.catalogString
+}).toUpperCase
--- End diff --

Done. Moved it to separate function. Thanks for the suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-22 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107562849
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
   assert(sql("select * from people_view").count() == 2)
 }
   }
+
+  test("SPARK-10849: create table using user specified column type.") {
+val data = Seq[Row](
+  Row(1, "dave", "Boston", "electric cars"),
+  Row(2, "mary", "Seattle", "building planes")
+)
+val schema = StructType(
+  StructField("id", IntegerType) ::
+StructField("first#name", StringType) ::
+StructField("city", StringType) ::
+StructField("descr", StringType) ::
+Nil)
+val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+// Use database specific CHAR/VARCHAR types instead of String data 
type.
+val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)"
+assert(JdbcUtils.schemaString(df.schema, url1, 
Option(createTableColTypes)) ==
+  sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , 
"descr" TEXT """)
--- End diff --

Thanks for review @maropu . Fixed it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-22 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107562605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,63 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  schema: StructType,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(schema, _))
+  .getOrElse(Map.empty[String, String])
 schema.fields foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ: String = userSpecifiedColTypesMap.get(field.name)
+.getOrElse(getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType,
+createTableColumnTypes: String): Map[String, String] = {
+val userSchema = 
CatalystSqlParser.parseTableSchema(createTableColumnTypes)
+val userColNames = userSchema.fieldNames
+// check duplicate columns in the user specified column types.
+if (userColNames.distinct.length != userColNames.length) {
+  val duplicates = userColNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => x
+  }.mkString(", ")
+  throw new AnalysisException(
+s"Found duplicate column(s) in createTableColumnTypes option 
value: $duplicates")
+}
+// check user specified column names exists in the data frame schema.
+val commonNames = userColNames.intersect(schema.fieldNames)
--- End diff --

Thank you for the review. Good question., updated the PR with 
case-sensitive handling.  Now column names from user specified schema are 
matched with data frame schema based on the SQLConf.CASE_SENSITIVE flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-21 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107091965
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
   assert(sql("select * from people_view").count() == 2)
 }
   }
+
+  test("SPARK-10849: create table using user specified column type.") {
+val data = Seq[Row](
+  Row(1, "dave", "Boston", "electric cars"),
+  Row(2, "mary", "Seattle", "building planes")
+)
+val schema = StructType(
+  StructField("id", IntegerType) ::
+StructField("first#name", StringType) ::
+StructField("city", StringType) ::
+StructField("descr", StringType) ::
+Nil)
+val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+// Use database specific CHAR/VARCHAR types instead of String data 
type.
+val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)"
+assert(JdbcUtils.schemaString(df.schema, url1, 
Option(createTableColTypes)) ==
+  sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , 
"descr" TEXT """)
--- End diff --

nit: Drop interpolation `s` in the head.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107086057
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -362,4 +363,80 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
   assert(sql("select * from people_view").count() == 2)
 }
   }
+
+  test("SPARK-10849: create table using user specified column type.") {
+val data = Seq[Row](
+  Row(1, "dave", "Boston", "electric cars"),
+  Row(2, "mary", "Seattle", "building planes")
+)
+val schema = StructType(
+  StructField("id", IntegerType) ::
+StructField("first#name", StringType) ::
+StructField("city", StringType) ::
+StructField("descr", StringType) ::
+Nil)
+val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+// Use database specific CHAR/VARCHAR types instead of String data 
type.
+val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)"
+assert(JdbcUtils.schemaString(df.schema, url1, 
Option(createTableColTypes)) ==
+  sid" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , 
"descr" TEXT """)
--- End diff --

This is the case that users specify all the columns. In this case, we 
should mix the order of the columns. 

In addition, we also need a case that users only specify one/two columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107084604
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,63 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  schema: StructType,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(schema, _))
+  .getOrElse(Map.empty[String, String])
 schema.fields foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ: String = userSpecifiedColTypesMap.get(field.name)
+.getOrElse(getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType,
+createTableColumnTypes: String): Map[String, String] = {
+val userSchema = 
CatalystSqlParser.parseTableSchema(createTableColumnTypes)
+val userColNames = userSchema.fieldNames
+// check duplicate columns in the user specified column types.
+if (userColNames.distinct.length != userColNames.length) {
+  val duplicates = userColNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => x
+  }.mkString(", ")
+  throw new AnalysisException(
+s"Found duplicate column(s) in createTableColumnTypes option 
value: $duplicates")
+}
+// check user specified column names exists in the data frame schema.
+val commonNames = userColNames.intersect(schema.fieldNames)
+if (commonNames.length != userColNames.length) {
+  val invalidColumns = userColNames.diff(commonNames).mkString(", ")
+  throw new AnalysisException(
+s"Found invalid column(s) in createTableColumnTypes option value: 
$invalidColumns")
+}
+
+// char/varchar gets translated to string type. Real data type 
specified by the user
+// is available in the field metadata as HIVE_TYPE_STRING
+userSchema.fields.map(f =>
+  f.name -> {
+(if (f.metadata.contains(HIVE_TYPE_STRING)) {
+  f.metadata.getString(HIVE_TYPE_STRING)
+} else {
+  f.dataType.catalogString
+}).toUpperCase
--- End diff --

We can create a partial function here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16209: [SPARK-10849][SQL] Adds option to the JDBC data s...

2017-03-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16209#discussion_r107084419
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -680,19 +681,63 @@ object JdbcUtils extends Logging {
   /**
* Compute the schema string for this RDD.
*/
-  def schemaString(schema: StructType, url: String): String = {
+  def schemaString(
+  schema: StructType,
+  url: String,
+  createTableColumnTypes: Option[String] = None): String = {
 val sb = new StringBuilder()
 val dialect = JdbcDialects.get(url)
+val userSpecifiedColTypesMap = createTableColumnTypes
+  .map(parseUserSpecifiedCreateTableColumnTypes(schema, _))
+  .getOrElse(Map.empty[String, String])
 schema.fields foreach { field =>
   val name = dialect.quoteIdentifier(field.name)
-  val typ: String = getJdbcType(field.dataType, 
dialect).databaseTypeDefinition
+  val typ: String = userSpecifiedColTypesMap.get(field.name)
+.getOrElse(getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
   val nullable = if (field.nullable) "" else "NOT NULL"
   sb.append(s", $name $typ $nullable")
 }
 if (sb.length < 2) "" else sb.substring(2)
   }
 
   /**
+   * Parses the user specified createTableColumnTypes option value string 
specified in the same
+   * format as create table ddl column types, and returns Map of field 
name and the data type to
+   * use in-place of the default data type.
+   */
+  private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType,
+createTableColumnTypes: String): Map[String, String] = {
+val userSchema = 
CatalystSqlParser.parseTableSchema(createTableColumnTypes)
+val userColNames = userSchema.fieldNames
+// check duplicate columns in the user specified column types.
+if (userColNames.distinct.length != userColNames.length) {
+  val duplicates = userColNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => x
+  }.mkString(", ")
+  throw new AnalysisException(
+s"Found duplicate column(s) in createTableColumnTypes option 
value: $duplicates")
+}
+// check user specified column names exists in the data frame schema.
+val commonNames = userColNames.intersect(schema.fieldNames)
--- End diff --

Are we assuming the name comparison is always case sensitive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org