huaxingao commented on a change in pull request #34164:
URL: https://github.com/apache/spark/pull/34164#discussion_r721903259
##########
File path:
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
+
+ def testIndex(tbl: String): Unit = {}
+
+ test("Test INDEX") {
Review comment:
Done
##########
File path:
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
+
+ def testIndex(tbl: String): Unit = {}
Review comment:
I intentionally leave it empty and let the subclass (e.g.
`MySQLIntegrationSuite`) implement it.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
##########
@@ -48,4 +52,50 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
jdbcOptions.parameters.originalMap ++
info.options.asCaseSensitiveMap().asScala)
JDBCWriteBuilder(schema, mergedOptions)
}
+
+ override def createIndex(
+ indexName: String,
+ indexType: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): Unit = {
+ withConnection { conn =>
+ classifyException(s"Failed to create index: $indexName in $name") {
+ JdbcUtils.createIndex(
+ conn, indexName, indexType, name, columns, columnsProperties,
properties, jdbcOptions)
+ }
+ }
+ }
+
+ override def indexExists(indexName: String): Boolean = {
+ withConnection { conn =>
+ JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+ }
+ }
+
+ override def dropIndex(indexName: String): Boolean = {
+ throw new UnsupportedOperationException("dropIndex is not supported yet")
+ }
+
+ override def listIndexes(): Array[TableIndex] = {
+ throw new UnsupportedOperationException("listIndexes is not supported yet")
+ }
+
+ private def withConnection[T](f: Connection => T): T = {
+ val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
+ try {
+ f(conn)
+ } finally {
+ conn.close()
+ }
+ }
+
+ private def classifyException[T](message: String)(f: => T): T = {
Review comment:
Great idea! Fixed.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with
Logging{
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
}
+ /**
+ * Creates an index.
+ *
+ * @param indexName the name of the index to be created
+ * @param indexType the IndexType of the index to be created
Review comment:
Done
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with
Logging{
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
}
+ /**
+ * Creates an index.
+ *
+ * @param indexName the name of the index to be created
+ * @param indexType the IndexType of the index to be created
+ * @param tableName the table on which index to be created
+ * @param columns the columns on which index to be created
+ * @param columnsProperties the properties of the columns on which index to
be created
+ * @param properties the properties of the index to be created
+ */
+ def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ throw new UnsupportedOperationException("Create index is not supported")
+ }
+
+ /**
+ * Checks whether an index exists
+ *
+ * @param indexName the name of the index
+ * @param tableName the table on which index to be checked
Review comment:
Done
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with
Logging{
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
}
+ /**
+ * Creates an index.
+ *
+ * @param indexName the name of the index to be created
+ * @param indexType the IndexType of the index to be created
+ * @param tableName the table on which index to be created
+ * @param columns the columns on which index to be created
+ * @param columnsProperties the properties of the columns on which index to
be created
+ * @param properties the properties of the index to be created
+ */
+ def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ throw new UnsupportedOperationException("Create index is not supported")
+ }
+
+ /**
+ * Checks whether an index exists
+ *
+ * @param indexName the name of the index
+ * @param tableName the table on which index to be checked
+ * @param options JDBCOptions of the table
+ * @return true if the index exists, false otherwise
Review comment:
Fixed.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -287,6 +289,42 @@ abstract class JdbcDialect extends Serializable with
Logging{
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
}
+ /**
+ * Creates an index.
+ *
+ * @param indexName the name of the index to be created
+ * @param indexType the IndexType of the index to be created
+ * @param tableName the table on which index to be created
+ * @param columns the columns on which index to be created
+ * @param columnsProperties the properties of the columns on which index to
be created
+ * @param properties the properties of the index to be created
+ */
+ def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ throw new UnsupportedOperationException("Create index is not supported")
+ }
+
+ /**
+ * Checks whether an index exists
+ *
+ * @param indexName the name of the index
+ * @param tableName the table on which index to be checked
+ * @param options JDBCOptions of the table
+ * @return true if the index exists, false otherwise
+ */
+ def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Boolean = {
+ throw new UnsupportedOperationException("indexExists is not supported")
Review comment:
Fixed
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
##########
@@ -102,4 +109,59 @@ private case object MySQLDialect extends JdbcDialect {
case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT))
case _ => JdbcUtils.getCommonJDBCType(dt)
}
+
+ // CREATE INDEX syntax
+ // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
+ override def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+ var indexProperties: String = ""
+ val scalaProps = properties.asScala
+ if (!properties.isEmpty) {
+ scalaProps.foreach { case (k, v) =>
+ indexProperties = indexProperties + " " + s"$k $v"
+ }
+ }
+
+ // columnsProperties doesn't apply to MySQL so it is ignored
+ s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
+ s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")})
$indexProperties"
+ }
+
+ // SHOW INDEX syntax
+ // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+ override def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Boolean = {
+ val sql = s"SHOW INDEXES FROM $tableName"
Review comment:
Good catch! Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]