[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-05 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239418#comment-16239418
 ] 

Xiao Li commented on SPARK-22443:
-

It sounds your custom dialect is a good solution for your scenario. You can 
customize it based on your requirement.

> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>Assignee: Huaxin Gao
> Fix For: 2.3.0
>
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at 

[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-04 Thread Hongbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239312#comment-16239312
 ] 

Hongbo commented on SPARK-22443:


In our case, we want to map MySQL YEAR to ShortType instead of DateType. 
Ideally, I'd like to write a custom dialect by overriding the getCatalystType 
method. But it doesn't work because it breaks the quoteIdentifier implemented 
in the predefined MySQLDialect.

We have a workaround. The custom dialect needs to override all other methods 
and redirect to the implementation in MySQLDialect. Then we unregister 
MySQLDialect and register our custom dialect. It's not robust because if there 
are new methods added to JdbcDialect in the future, it may break again.


> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 

[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-04 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239133#comment-16239133
 ] 

Sean Owen commented on SPARK-22443:
---

The semantics here are already odd. The methods might return values from 
different implementations already, and there's no reason to expect they're 
consistent or compatible. There's also not a way to know if a method is 
overridden (short of reflection).

I'm not actually sure what the use case is for this, but assume it intends to 
wrap a dialect, with another dialect as fallback. If so, it makes sense to 
prefer the first dialect's value, as it's also the first implementation that 
returns a non-None value, always.

Returning the first non-null value seems reasonable too, OK.

> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>Priority: Normal
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> 

[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-04 Thread Hongbo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239097#comment-16239097
 ] 

Hongbo commented on SPARK-22443:


[~srowen] Thanks for the quick response!

I think returning the first dialect is an acceptable solution. But I was 
wondering whether it could be better?

Suppose the first dialect doesn't override,  e.g., the quoteIdentifier method, 
but the second dialect overrides it. Naturally, using the implementation in the 
second dialect is better.  But in the current implementation, it will use the 
default implementation in the base JdbcDialect class.

Maybe we can derive new dialects from another base class which returns null(I 
hate null, but wrap with Option will change external API) for the string 
methods? And in AggregatedDialect, it can return the first non-null result. If 
all the dialects return null, then it returns the default implementation in 
NoopDialect (the trivial concrete object derived from JdbcDialect).

Just my two cents.

> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>Priority: Normal
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> 

[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-04 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238858#comment-16238858
 ] 

Apache Spark commented on SPARK-22443:
--

User 'huaxingao' has created a pull request for this issue:
https://github.com/apache/spark/pull/19658

> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>Priority: Normal
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at 

[jira] [Commented] (SPARK-22443) AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

2017-11-03 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238482#comment-16238482
 ] 

Sean Owen commented on SPARK-22443:
---

Good catch. I suppose that this and getTableExistsQuery and getSchemaQuery need 
to return the value from the first dialect?

> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> 
>
> Key: SPARK-22443
> URL: https://issues.apache.org/jira/browse/SPARK-22443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Hongbo
>Priority: Normal
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
> sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
> None
>   }
>   override def quoteIdentifier(colName: String): String = {
> s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
> val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
> JdbcDialects.registerDialect(AnotherMySQLDialect)
> val jdbcUrl = s"jdbc:mysql://host:port/db?user=user=password"
> spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>   at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at