[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80641407
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
+val partitionInfo = if (partitionColumn == null) {
   null
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
--- End diff --

I also took a look at @gatorsmile 's approach, I think it's easier to 
understand, why it's rejected? We can also get rid of the `return`:
```
if (tableExists) {
  mode match {
case SaveMode.Ignore =>
..
  }
} else {
  ..
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80628570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
+val partitionInfo = if (partitionColumn == null) {
   null
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
--- End diff --

If the table does not exist and the mode is `OVERWRITE`, we create a table, 
then insert rows into the table, and finally return a BaseRelation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80628287
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
+val partitionInfo = if (partitionColumn == null) {
   null
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
--- End diff --

Now, at least, three of reviewers are confused of this bit. Do you mind if 
I submit a PR to clean up this part?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80627940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
+val partitionInfo = if (partitionColumn == null) {
   null
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
--- End diff --

what does this table mean? what is `CreateTable, saveTable, BaseRelation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/12601


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-25 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80404639
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
+.save
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-25 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80404577
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1096,13 +1096,17 @@ the Data Sources API. The following options are 
supported:
 
 {% highlight sql %}
 
-CREATE TEMPORARY VIEW jdbcTable
+CREATE TEMPORARY TABLE jdbcTable
--- End diff --

Done, thanks. I had been going off of the tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80353253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -420,62 +420,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   def jdbc(url: String, table: String, connectionProperties: Properties): 
Unit = {
 assertNotPartitioned("jdbc")
 assertNotBucketed("jdbc")
-
-// to add required options like URL and dbtable
-val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> 
table)
-val jdbcOptions = new JDBCOptions(params)
-val jdbcUrl = jdbcOptions.url
-val jdbcTable = jdbcOptions.table
-
-val props = new Properties()
-extraOptions.foreach { case (key, value) =>
-  props.put(key, value)
-}
 // connectionProperties should override settings in extraOptions
-props.putAll(connectionProperties)
-val conn = JdbcUtils.createConnectionFactory(jdbcUrl, props)()
-
-try {
-  var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
-
-  if (mode == SaveMode.Ignore && tableExists) {
-return
-  }
-
-  if (mode == SaveMode.ErrorIfExists && tableExists) {
-sys.error(s"Table $jdbcTable already exists.")
-  }
-
-  if (mode == SaveMode.Overwrite && tableExists) {
-if (jdbcOptions.isTruncate &&
-JdbcUtils.isCascadingTruncateTable(jdbcUrl) == Some(false)) {
-  JdbcUtils.truncateTable(conn, jdbcTable)
-} else {
-  JdbcUtils.dropTable(conn, jdbcTable)
-  tableExists = false
-}
-  }
-
-  // Create the table if the table didn't exist.
-  if (!tableExists) {
-val schema = JdbcUtils.schemaString(df, jdbcUrl)
-// To allow certain options to append when create a new table, 
which can be
-// table_options or partition_options.
-// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT 
CHARSET=utf8"
-val createtblOptions = jdbcOptions.createTableOptions
-val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
-val statement = conn.createStatement
-try {
-  statement.executeUpdate(sql)
-} finally {
-  statement.close()
-}
-  }
-} finally {
-  conn.close()
-}
-
-JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
+this.extraOptions = this.extraOptions ++ (connectionProperties.asScala)
+// explicit url and dbtable should override all
+this.extraOptions += ("url" -> url, "dbtable" -> table)
+format("jdbc").save
--- End diff --

The omission of parentheses on methods should only be used when the method 
has no side-effects. 

Thus, please change it to `save()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80353203
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
+.save
--- End diff --

Nit: `save` -> `save()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80353010
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1096,13 +1096,17 @@ the Data Sources API. The following options are 
supported:
 
 {% highlight sql %}
 
-CREATE TEMPORARY VIEW jdbcTable
+CREATE TEMPORARY TABLE jdbcTable
--- End diff --

Please change it back. `CREATE TEMPORARY TABLE` is deprecated. You will get 
a Parser error
```
CREATE TEMPORARY TABLE is not supported yet. Please use CREATE TEMPORARY 
VIEW as an alternative.(line 1, pos 0)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80352586
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 ---
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 // $example off:schema_merging$
 
--- End diff --

@HyukjinKwon Yes, that is what I was talking about...just fixed it back


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80352317
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 ---
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 // $example off:schema_merging$
 
--- End diff --

Oh, maybe, my previous comment was not clear. I meant

```java
Import java.util.List;
// $example off:schema_merging$
Import java.util.Properties;
```

I haven't tried to build the doc against the current state but I guess we 
won't need this import for Parquet`s schema mering example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80350919
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 ---
@@ -23,6 +23,8 @@
 import java.util.List;
 // $example off:schema_merging$
 
+import java.util.Properties;
+
--- End diff --

No reason to not follow the guildline?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80350755
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 ---
@@ -23,6 +23,8 @@
 import java.util.List;
 // $example off:schema_merging$
 
+import java.util.Properties;
+
--- End diff --

Should this really be added to the example, though?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r80350458
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 ---
@@ -23,6 +23,8 @@
 import java.util.List;
 // $example off:schema_merging$
 
+import java.util.Properties;
+
--- End diff --

I think we should put `java.util` imports together without additional 
newline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-08 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77950064
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -44,6 +46,11 @@ class JDBCOptions(
   // the number of partitions
   val numPartitions = parameters.getOrElse("numPartitions", null)
 
+  require(partitionColumn == null ||
+(partitionColumn != null && lowerBound != null && upperBound != null 
&& numPartitions != null),
--- End diff --

You can simplify it by
```Scala
partitionColumn == null || (lowerBound != null && upperBound != null && 
numPartitions != null)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
+
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
--- End diff --

Yeah, then, just issue an `AnalysisException`, like what we did in the 
other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949563
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -82,7 +81,7 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 
   val (doCreate, doSave) = (mode, tableExists) match {
 case (SaveMode.Ignore, true) => (false, false)
-case (SaveMode.ErrorIfExists, true) => throw new 
TableAlreadyExistsException(
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
   s"Table $table already exists, and SaveMode is set to 
ErrorIfExists.")
--- End diff --

Normally, we issue an `AnalysisException`. `Table or view $table already 
exists`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949211
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
--- End diff --

It depends on if the table exception is used or not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949176
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
--- End diff --

Correct, this was left from SchemaRelationProvider


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77949090
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
+val partitionInfo = if (partitionColumn == null) { 
+  null 
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new 
TableAlreadyExistsException(
+  s"Table $table already exists, and SaveMode is set to 
ErrorIfExists.")
--- End diff --

No, [it is 
required...](https://github.com/apache/spark/blob/5c6b0855787c080d3e233eb09c05c025395e7cb3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala#L30)
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948807
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
+val partitionInfo = if (partitionColumn == null) { 
+  null 
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new 
TableAlreadyExistsException(
+  s"Table $table already exists, and SaveMode is set to 
ErrorIfExists.")
--- End diff --

Just pass the table name. That is enough


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
+val partitionInfo = if (partitionColumn == null) { 
+  null 
 } else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new 
TableAlreadyExistsException(
--- End diff --

You need to import it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948624
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
+val partitionInfo = if (partitionColumn == null) { 
+  null 
--- End diff --

It sounds like you add an extra space. 

Could you run the command to check the style? 
`dev/lint-scala`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948654
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
+
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
+val partitionInfo = if (partitionColumn == null) { 
+  null 
--- End diff --

It seems mistakenly white spaces were added here. scalastyle will complain 
about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
--- End diff --

Not used, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948436
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,106 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
--- End diff --

Not used, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
--- End diff --

Not used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948243
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+df2.write.mode(SaveMode.Overwrite).format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).count())
+assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).collect()(0).length)
+  }
+
+  test("save errors if url is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("dbtable", "TEST.TRUNCATETEST")
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'url' is required"))
+  }
+
+  test("save errors if dbtable is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("url", url1)
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'dbtable' is required"))
+  }
+
+  test("save errors if wrong user/password combination") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[org.h2.jdbc.JdbcSQLException] {
--- End diff --

I must have copy/pasted this from somewhere...no clue why I would use a var 
otherwise. Shame on me :p


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948196
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
--- End diff --

Yes, but the guidelines do not specify this scenario. It is not returning a 
unit, but a value and looks ridiculous in comparison. I have made the change to 
fit your needs  and "_speed_" up, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
+
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
--- End diff --

I didn't know that exception had been created in the Spark code. Any 
suggestions for an easy way to pull out the db, though?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77948082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
--- End diff --

uh, we do not have a test case to cover that. Since you made a change, 
could you add such a test case? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77947902
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

I did add a comment in the method signature. That and the variable naming 
conventions should cover this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77947885
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
--- End diff --

How about `TRUNCATETEST` -> `SAVETEST`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77947343
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

Then would it make sense if we add some comments for each case? In a quick 
look, it seems really confusing what each case means to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946981
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
+
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
+  s"Table $table already exists, and SaveMode is set to 
ErrorIfExists.")
+case (SaveMode.Overwrite, true) =>
+  if (jdbcOptions.isTruncate && 
JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
+JdbcUtils.truncateTable(conn, table)
+(false, true)
+  } else {
+JdbcUtils.dropTable(conn, table)
+(true, true)
+  }
+case (SaveMode.Append, true) => (false, true)
+case (_, true) => throw new IllegalArgumentException(s"Unexpected 
SaveMode, '$mode'," +
+  " for handling existing tables.")
+case (_, false) => (true, true)
+  }
  

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
+
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
+  s"Table $table already exists, and SaveMode is set to 
ErrorIfExists.")
+case (SaveMode.Overwrite, true) =>
+  if (jdbcOptions.isTruncate && 
JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
+JdbcUtils.truncateTable(conn, table)
+(false, true)
+  } else {
+JdbcUtils.dropTable(conn, table)
+(true, true)
+  }
+case (SaveMode.Append, true) => (false, true)
+case (_, true) => throw new IllegalArgumentException(s"Unexpected 
SaveMode, '$mode'," +
+  " for handling existing tables.")
+case (_, false) => (true, true)
+  }
  

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
--- End diff --

Let's follow this just to speed up and follow the majority of the other 
codes.
There is a correct example in the guide lines as blow:

```scala
// Correct:
if (true) {
  println("Wow!")
}
```

not

```scala
// Correct:
if (true) { println("Wow!") }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
--- End diff --

To what?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946806
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
+
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => throw new SQLException(
--- End diff --

Please throw the exception `TableAlreadyExistsException`. The target could 
be a `View`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946715
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

Ok. I am fine, if the other are ok about it. Let me review your version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946660
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+df2.write.mode(SaveMode.Overwrite).format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).count())
+assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).collect()(0).length)
+  }
+
+  test("save errors if url is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("dbtable", "TEST.TRUNCATETEST")
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'url' is required"))
+  }
+
+  test("save errors if dbtable is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("url", url1)
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'dbtable' is required"))
+  }
+
+  test("save errors if wrong user/password combination") {
+import scala.collection.JavaConverters._
--- End diff --

How about putting this import in the top with other imports if either is 
okay? I understand your point putting this inside but let's just follow the 
majority of other codes just to speed up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
--- End diff --

I had put that when adding the brackets, but it actually hurts the code 
flow in this case. And there is nothing in the style guide saying otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946422
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+import collection.JavaConverters._
--- End diff --

I meant putting it up with other imports. Also, I remember this should be 
`scala.collection.JavaConverters`. scalastyle will check this IIRC.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946434
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
--- End diff --

It was moved into the JDBCOptions as had been previously discussed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946299
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

Your way results in the need for a `return`, which can lead to problems and 
is [generally discouraged](https://tpolecat.github.io/2014/05/09/return.html). 
In the current implementation you could just have it do nothing and the next if 
block will be skipped anyway, but that leaves a lot of room for error in 
further code changes. Whereas this 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946277
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+df2.write.mode(SaveMode.Overwrite).format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
+  .options(properties.asScala)
+  .save()
+assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).count())
+assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).collect()(0).length)
+  }
+
+  test("save errors if url is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("dbtable", "TEST.TRUNCATETEST")
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'url' is required"))
+  }
+
+  test("save errors if dbtable is not specified") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[RuntimeException] {
+  df.write.format("jdbc")
+.option("url", url1)
+.options(properties.asScala)
+.save()
+}.getMessage
+assert(e.contains("Option 'dbtable' is required"))
+  }
+
+  test("save errors if wrong user/password combination") {
+import scala.collection.JavaConverters._
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+var e = intercept[org.h2.jdbc.JdbcSQLException] {
--- End diff --

`var` -> `val`. The same issues in the above test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946237
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
+val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+
+df.write.format("jdbc")
+.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
+.save
+
+assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).count)
+assert(
+  2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new 
Properties).collect()(0).length)
+  }
+
+  test("save API with SaveMode.Overwrite") {
+import scala.collection.JavaConverters._
+
+val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), 
schema2)
+val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), 
schema2)
+
+df.write.format("jdbc")
+  .option("url", url1)
+  .option("dbtable", "TEST.TRUNCATETEST")
--- End diff --

Could you update the table names in all the test cases? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
--- End diff --

Any reason why this is removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77946123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) { null }
--- End diff --

```Scala
if (partitionColumn == null) {
  null
} else {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77945639
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---
@@ -208,4 +208,16 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
 assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", 
properties).collect()(0).length)
   }
+
+  test("save works for format(\"jdbc\") if url and dbtable are set") {
--- End diff --

Negative test cases are needed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77945599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

I also prefer to my way, which looks cleaner and easier to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77945537
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,104 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
--- End diff --

Please correct the style here. See 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77945298
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,104 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with RelationProvider with DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
 JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
--- End diff --

Can we maybe move this up too if either is okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77944399
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,113 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite*  | (DropTable, CreateTable,) | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   *
+   * *Overwrite & tableExists with truncate, will not drop & create, but 
instead truncate
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val jdbcOptions = new JDBCOptions(parameters)
+val url = jdbcOptions.url
+val table = jdbcOptions.table
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
--- End diff --

Initially, I mean to correct this as @gatorsmile did in 
[here](https://github.com/gatorsmile/spark/blob/07e316823ed17e89c3df0aaccf3fbb958afcfe3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L390-L423).
 I am not saying this is wrong or inappropriate but just personally I'd prefer 
this way.


---
If your 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-01 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77239532
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -114,7 +115,9 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, 
properties))
+  }
--- End diff --

@HyukjinKwon I thought that since this was over the 100 limit it would be 
more readable/maintainable in the long run to have brackets. Again, I have no 
preference and if you feel strongly I will make the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-01 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77238783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+require(parameters.isDefinedAt("url"), "Saving jdbc source requires 
'url' to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")")
+require(parameters.isDefinedAt("dbtable"), "Saving jdbc source 
requires 'dbtable' to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")
+val url = parameters("url")
+val table = parameters("dbtable")
--- End diff --

@HyukjinKwon I thought about that, but this code ends up not needing the 
extra checks. So, it seemed unnecessary at this point. I can go either way on 
this one, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77122510
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -17,39 +17,105 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.SQLException
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+require(parameters.isDefinedAt("url"), "Saving jdbc source requires 
'url' to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")")
+require(parameters.isDefinedAt("dbtable"), "Saving jdbc source 
requires 'dbtable' to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")
+val url = parameters("url")
+val table = parameters("dbtable")
--- End diff --

Would this make sense if we use `JDBCOptions` here too?  (rather than 
adding `require` duplicately)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77122262
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -114,7 +115,9 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, 
properties))
+  }
--- End diff --

It seems we might not need the extra brackets here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-09-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77122073
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): 
Unit = {
+import scala.collection.JavaConverters._
--- End diff --

+1 for moving the import up. Just it looks ugly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77067046
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -36,4 +38,9 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
   // the number of partitions
   val numPartitions = parameters.getOrElse("numPartitions", null)
+
+  require(partitionColumn == null || 
--- End diff --

Please double check my boolean conversion, as the prior if had to be 
flipped for the require.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77060202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
   // the number of partitions
   val numPartitions = parameters.getOrElse("numPartitions", null)
+
+  if (partitionColumn != null
+  && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
+  sys.error("Partitioning incompletely specified")
--- End diff --

Agreed, as already stated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77061367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val url = parameters.getOrElse("url",
+  sys.error("Saving jdbc source requires url to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
+val table = parameters.getOrElse("dbtable", 
parameters.getOrElse("table",
+  sys.error("Saving jdbc source requires dbtable to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table 
already exists.")
+case (SaveMode.Overwrite, true) =>
+  JdbcUtils.dropTable(conn, table)
--- 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77060138
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
--- End diff --

I am fine with changing this to require as the short-circuit should occur 
at the very top instead of as the processing moves through. `sys.error` was how 
the old code was written, so I just had kept it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77060790
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
--- End diff --

No, this is to meet the requirements of trait `RelationProvider`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-31 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r77058751
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): 
Unit = {
+import scala.collection.JavaConverters._
--- End diff --

I opted to only import them here because it is the only place they are 
required, so there is no need to drag in the import to the whole class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r76512092
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
 val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
-  sys.error("Partitioning incompletely specified")
-}
+val partitionColumn = jdbcOptions.partitionColumn
+val lowerBound = jdbcOptions.lowerBound
+val upperBound = jdbcOptions.upperBound
+val numPartitions = jdbcOptions.numPartitions
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties,
+  Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val url = parameters.getOrElse("url",
+  sys.error("Saving jdbc source requires url to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
+val table = parameters.getOrElse("dbtable", 
parameters.getOrElse("table",
+  sys.error("Saving jdbc source requires dbtable to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = JdbcUtils.tableExists(conn, url, table)
+
+  val (doCreate, doSave) = (mode, tableExists) match {
+case (SaveMode.Ignore, true) => (false, false)
+case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table 
already exists.")
+case (SaveMode.Overwrite, true) =>
+  JdbcUtils.dropTable(conn, table)
--- End 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r76512084
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
--- End diff --

I see some of the existing code has a `getOrElse(..., sys.error...)`. I 
think we should switch to either

```
require(foo.isDefined, ...)
... foo.get(...)
```
or
```
foo.getOrElse(..., throw new IllegalArgumentException(...))
```
One problem is that `sys.error` just generates a bald `RuntimeException`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r76512050
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
--- End diff --

Is this a separate method instead of using an optional arg to try to retain 
binary compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r76512038
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 1.4.0
*/
   def jdbc(url: String, table: String, connectionProperties: Properties): 
Unit = {
+import scala.collection.JavaConverters._
--- End diff --

Nit: I'd import these with other imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r76512043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
   // the number of partitions
   val numPartitions = parameters.getOrElse("numPartitions", null)
+
+  if (partitionColumn != null
+  && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
+  sys.error("Partitioning incompletely specified")
--- End diff --

We should be using `require` not `sys.error` everywhere


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-07-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r69267676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
--- End diff --

I think the validation can be done together in `JDBCOptions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-07-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r69267347
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -96,7 +97,16 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+val resolvedSchema = JDBCRDD.resolveTable(url, table, properties)
+providedSchemaOption match {
+  case Some(providedSchema) =>
+if (providedSchema.sql.toLowerCase == 
resolvedSchema.sql.toLowerCase) resolvedSchema
--- End diff --

I think `JDBCRDD.resolveTable` needs another query execution. Although it 
would be less expensive than inferring schemas in CSV or JSON, it would be 
still a bit of overhead. I am not 100% about this too. So, I think it might be 
better to be consistent with the others in this case.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-17 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r67582844
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
+
+if (partitionColumn != null
+  && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
   sys.error("Partitioning incompletely specified")
 }
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(url, table, parts, properties, 
Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val url = parameters.getOrElse("url",
+  sys.error("Saving jdbc source requires url to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
+val table = parameters.getOrElse("dbtable", 
parameters.getOrElse("table",
+  sys.error("Saving jdbc source requires dbtable to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-17 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r67582037
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -96,7 +97,16 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+val resolvedSchema = JDBCRDD.resolveTable(url, table, properties)
+providedSchemaOption match {
+  case Some(providedSchema) =>
+if (providedSchema.sql.toLowerCase == 
resolvedSchema.sql.toLowerCase) resolvedSchema
--- End diff --

I can easily do a simpler getOrElse as is done in 
[spark-xml](https://github.com/databricks/spark-xml/blob/9f681939d16508abf4a12a129469ffebf87a2fa4/src/main/scala/com/databricks/spark/xml/XmlRelation.scala)
 which has more of a benefit of being lazier. But if an error does occur due to 
a mismatch, then the error is further from the original issue. I'm fine with 
either scenario, but at least wanted to give the other side for this one. 
Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-17 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r67539731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
--- End diff --

@HyukjinKwon Thanks, I did not know about this. Before I push code I was 
curious why JDBCOptions does not include the partitioning validation? That 
seems like a point of duplication also.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r66716164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -96,7 +97,16 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+val resolvedSchema = JDBCRDD.resolveTable(url, table, properties)
+providedSchemaOption match {
+  case Some(providedSchema) =>
+if (providedSchema.sql.toLowerCase == 
resolvedSchema.sql.toLowerCase) resolvedSchema
--- End diff --

I guess it would make sense if it does not try to apply the resolved schema 
when the schema is explicitly set like the other data sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r66716162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
+
+if (partitionColumn != null
+  && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
   sys.error("Partitioning incompletely specified")
 }
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(url, table, parts, properties, 
Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val url = parameters.getOrElse("url",
+  sys.error("Saving jdbc source requires url to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
+val table = parameters.getOrElse("dbtable", 
parameters.getOrElse("table",
+  sys.error("Saving jdbc source requires dbtable to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))
+
+import collection.JavaConverters._
+val props = new Properties()
+props.putAll(parameters.asJava)
+val conn = JdbcUtils.createConnectionFactory(url, props)()
+
+try {
+  val tableExists = 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r66716158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
--- End diff --

There is a class for those options, `JDBCOptions`. It would be nicer if 
those options are managed in a single place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r66716161
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.util.Properties
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, RelationProvider, 
SchemaRelationProvider}
+import org.apache.spark.sql.types.StructType
 
-class JdbcRelationProvider extends RelationProvider with 
DataSourceRegister {
+class JdbcRelationProvider extends CreatableRelationProvider
+  with SchemaRelationProvider with RelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "jdbc"
 
-  /** Returns a new base relation with the given parameters. */
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val jdbcOptions = new JDBCOptions(parameters)
-if (jdbcOptions.partitionColumn != null
-  && (jdbcOptions.lowerBound == null
-|| jdbcOptions.upperBound == null
-|| jdbcOptions.numPartitions == null)) {
+createRelation(sqlContext, parameters, null)
+  }
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+val table = parameters.getOrElse("dbtable", sys.error("Option 
'dbtable' not specified"))
+val partitionColumn = parameters.getOrElse("partitionColumn", null)
+val lowerBound = parameters.getOrElse("lowerBound", null)
+val upperBound = parameters.getOrElse("upperBound", null)
+val numPartitions = parameters.getOrElse("numPartitions", null)
+
+if (partitionColumn != null
+  && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
   sys.error("Partitioning incompletely specified")
 }
 
-val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-  null
-} else {
+val partitionInfo = if (partitionColumn == null) null
+else {
   JDBCPartitioningInfo(
-jdbcOptions.partitionColumn,
-jdbcOptions.lowerBound.toLong,
-jdbcOptions.upperBound.toLong,
-jdbcOptions.numPartitions.toInt)
+partitionColumn, lowerBound.toLong, upperBound.toLong, 
numPartitions.toInt)
 }
 val parts = JDBCRelation.columnPartition(partitionInfo)
 val properties = new Properties() // Additional properties that we 
will pass to getConnection
 parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
+JDBCRelation(url, table, parts, properties, 
Option(schema))(sqlContext.sparkSession)
+  }
+
+  /*
+   * The following structure applies to this code:
+   * |tableExists|  !tableExists
+   
*
+   * Ignore  | BaseRelation  | CreateTable, saveTable, 
BaseRelation
+   * ErrorIfExists   | ERROR | CreateTable, saveTable, 
BaseRelation
+   * Overwrite   | DropTable, CreateTable,   | CreateTable, saveTable, 
BaseRelation
+   * | saveTable, BaseRelation   |
+   * Append  | saveTable, BaseRelation   | CreateTable, saveTable, 
BaseRelation
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+val url = parameters.getOrElse("url",
+  sys.error("Saving jdbc source requires url to be set." +
+" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
+val table = parameters.getOrElse("dbtable", 
parameters.getOrElse("table",
+  sys.error("Saving jdbc source requires dbtable to be set." +
+" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))
+
+import collection.JavaConverters._
--- End diff --

I think this just can be imported at the class level rather than trying to 
import this for every time it creates a relation.


---
If your project is set up for it, you can reply to 

[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...

2016-06-06 Thread JustinPihony
Github user JustinPihony commented on a diff in the pull request:

https://github.com/apache/spark/pull/12601#discussion_r66009900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -96,7 +97,16 @@ private[sql] case class JDBCRelation(
 
   override val needConversion: Boolean = false
 
-  override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
+  override val schema: StructType = {
+val resolvedSchema = JDBCRDD.resolveTable(url, table, properties)
+providedSchemaOption match {
+  case Some(providedSchema) =>
+if (providedSchema.sql.toLowerCase == 
resolvedSchema.sql.toLowerCase) resolvedSchema
--- End diff --

This is the only area I'm unsure about. I'd like a second opinion on 
whether this seems ok, or if I need to build something more custom for schema 
comparison.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org