sadikovi commented on code in PR #35764:
URL: https://github.com/apache/spark/pull/35764#discussion_r980766168


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is 
date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => 
filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as 
uBound " +

Review Comment:
   Can you explain this logic in the javadoc for this method? Also, what 
happens if the table is empty?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column

Review Comment:
   nit: Get the min and max values for the column.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is 
date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => 
filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as 
uBound " +
+        s"from ${jdbcOptions.tableOrQuery} limit 1"
+      val conn = 
JdbcDialects.get(jdbcOptions.url).createConnectionFactory(jdbcOptions)(-1)
+      try {
+        val statement = conn.prepareStatement(sql)
+        try {
+          statement.setQueryTimeout(jdbcOptions.queryTimeout)
+          val resultSet = statement.executeQuery()
+          while (resultSet.next()) {
+            lBound = resultSet.getString("lBound")

Review Comment:
   Would it work for primary keys that are integers or timestamps?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is 
date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => 
filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&

Review Comment:
   Does the code handle composite primary keys or any multi-column indexes, 
e.g. with 2 or more columns?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala:
##########
@@ -111,6 +111,9 @@ class JDBCOptions(
   // the number of partitions
   val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
 
+  // the default number of partitions

Review Comment:
   Can you update this comment? It is unclear what default number of partition 
it is - is it for overall number of partitions in the RDD or is it specifically 
for primary keys in the table and pushed filters?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is 
date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => 
filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as 
uBound " +
+        s"from ${jdbcOptions.tableOrQuery} limit 1"
+      val conn = 
JdbcDialects.get(jdbcOptions.url).createConnectionFactory(jdbcOptions)(-1)
+      try {
+        val statement = conn.prepareStatement(sql)
+        try {
+          statement.setQueryTimeout(jdbcOptions.queryTimeout)
+          val resultSet = statement.executeQuery()
+          while (resultSet.next()) {
+            lBound = resultSet.getString("lBound")
+            uBound = resultSet.getString("uBound")
+          }
+        } catch {
+          case _: SQLException =>

Review Comment:
   Maybe it is worth at least logging the exception but I would consider 
re-throwing it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##########
@@ -306,6 +318,12 @@ object JdbcUtils extends Logging with SQLConfHelper {
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
           getCatalystType(dataType, fieldSize, fieldScale, isSigned))
+      list.contains(columnName) match {

Review Comment:
   Is it the same as:
   ```scala
   metadata.putBoolean("isIndexKey", list.contains(columnName))
   ```
   
   Also, can we make list a set?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is 
date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {

Review Comment:
   Shall we return `Option[JDBCPartitioningInfo]` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to