huaxingao commented on a change in pull request #34164:
URL: https://github.com/apache/spark/pull/34164#discussion_r721903259



##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testCreateTableWithProperty(s"$catalogName.new_table")
     }
   }
+
+  def testIndex(tbl: String): Unit = {}
+
+  test("Test INDEX") {

Review comment:
       Done

##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testCreateTableWithProperty(s"$catalogName.new_table")
     }
   }
+
+  def testIndex(tbl: String): Unit = {}

Review comment:
       I intentionally leave it empty and let the subclass (e.g. 
`MySQLIntegrationSuite`) implement it. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
##########
@@ -48,4 +52,50 @@ case class JDBCTable(ident: Identifier, schema: StructType, 
jdbcOptions: JDBCOpt
       jdbcOptions.parameters.originalMap ++ 
info.options.asCaseSensitiveMap().asScala)
     JDBCWriteBuilder(schema, mergedOptions)
   }
+
+  override def createIndex(
+      indexName: String,
+      indexType: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): Unit = {
+    withConnection { conn =>
+      classifyException(s"Failed to create index: $indexName in $name") {
+        JdbcUtils.createIndex(
+          conn, indexName, indexType, name, columns, columnsProperties, 
properties, jdbcOptions)
+      }
+    }
+  }
+
+  override def indexExists(indexName: String): Boolean = {
+    withConnection { conn =>
+      JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+    }
+  }
+
+  override def dropIndex(indexName: String): Boolean = {
+    throw new UnsupportedOperationException("dropIndex is not supported yet")
+  }
+
+  override def listIndexes(): Array[TableIndex] = {
+    throw new UnsupportedOperationException("listIndexes is not supported yet")
+  }
+
+  private def withConnection[T](f: Connection => T): T = {
+    val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
+    try {
+      f(conn)
+    } finally {
+      conn.close()
+    }
+  }
+
+  private def classifyException[T](message: String)(f: => T): T = {

Review comment:
       Great idea! Fixed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with 
Logging{
     s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
   }
 
+  /**
+   * Creates an index.
+   *
+   * @param indexName         the name of the index to be created
+   * @param indexType         the IndexType of the index to be created

Review comment:
       Done

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with 
Logging{
     s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
   }
 
+  /**
+   * Creates an index.
+   *
+   * @param indexName         the name of the index to be created
+   * @param indexType         the IndexType of the index to be created
+   * @param tableName         the table on which index to be created
+   * @param columns           the columns on which index to be created
+   * @param columnsProperties the properties of the columns on which index to 
be created
+   * @param properties        the properties of the index to be created
+   */
+  def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    throw new UnsupportedOperationException("Create index is not supported")
+  }
+
+  /**
+   * Checks whether an index exists
+   *
+   * @param indexName the name of the index
+   * @param tableName the table on which index to be checked

Review comment:
       Done

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with 
Logging{
     s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
   }
 
+  /**
+   * Creates an index.
+   *
+   * @param indexName         the name of the index to be created
+   * @param indexType         the IndexType of the index to be created
+   * @param tableName         the table on which index to be created
+   * @param columns           the columns on which index to be created
+   * @param columnsProperties the properties of the columns on which index to 
be created
+   * @param properties        the properties of the index to be created
+   */
+  def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    throw new UnsupportedOperationException("Create index is not supported")
+  }
+
+  /**
+   * Checks whether an index exists
+   *
+   * @param indexName the name of the index
+   * @param tableName the table on which index to be checked
+   * @param options JDBCOptions of the table
+   * @return true if the index exists, false otherwise

Review comment:
       Fixed. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with 
Logging{
     s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
   }
 
+  /**
+   * Creates an index.
+   *
+   * @param indexName         the name of the index to be created
+   * @param indexType         the IndexType of the index to be created
+   * @param tableName         the table on which index to be created
+   * @param columns           the columns on which index to be created
+   * @param columnsProperties the properties of the columns on which index to 
be created
+   * @param properties        the properties of the index to be created
+   */
+  def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    throw new UnsupportedOperationException("Create index is not supported")
+  }
+
+  /**
+   * Checks whether an index exists
+   *
+   * @param indexName the name of the index
+   * @param tableName the table on which index to be checked
+   * @param options JDBCOptions of the table
+   * @return true if the index exists, false otherwise
+   */
+  def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Boolean = {
+    throw new UnsupportedOperationException("indexExists is not supported")

Review comment:
       Fixed

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
##########
@@ -102,4 +109,59 @@ private case object MySQLDialect extends JdbcDialect {
     case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT))
     case _ => JdbcUtils.getCommonJDBCType(dt)
   }
+
+  // CREATE INDEX syntax
+  // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
+  override def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+    var indexProperties: String = ""
+    val scalaProps = properties.asScala
+    if (!properties.isEmpty) {
+      scalaProps.foreach { case (k, v) =>
+        indexProperties = indexProperties + " " + s"$k $v"
+      }
+    }
+
+    // columnsProperties doesn't apply to MySQL so it is ignored
+    s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
+      s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) 
$indexProperties"
+  }
+
+  // SHOW INDEX syntax
+  // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+  override def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Boolean = {
+    val sql = s"SHOW INDEXES FROM $tableName"

Review comment:
       Good catch! Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to