MaxGekk commented on code in PR #49029:
URL: https://github.com/apache/spark/pull/49029#discussion_r1887233636


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala:
##########
@@ -38,6 +39,7 @@ import org.apache.spark.sql.execution.command.CommandCheck
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
 import org.apache.spark.sql.execution.streaming.ResolveWriteToStream
+import org.apache.spark.sql.hive.DataSourceWithHiveResolver

Review Comment:
   not needed as the same package



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/DataSourceWithHiveResolver.scala:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{DataSourceResolver, 
LogicalRelation}
+
+/**
+ * [[DataSourceWithHiveResolver]] is a [[DataSourceResolver]] that 
additionally handles
+ * [[HiveTableRelation]] conversion using [[RelationConversions]].
+ */
+class DataSourceWithHiveResolver(sparkSession: SparkSession, hiveCatalog: 
HiveSessionCatalog)
+    extends DataSourceResolver(sparkSession) {
+  val relationConversions = new RelationConversions(hiveCatalog)

Review Comment:
   `new` is not necessary since `RelationConversions` is a case class. 



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverSuite.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver' 



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver'



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  AnalyzerBridgeState,
+  BridgedRelationMetadataProvider,
+  MetadataResolver,
+  RelationId,
+  Resolver
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.{FileResolver, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class MetadataResolverSuite extends QueryTest with SharedSparkSession with 
SQLTestUtils {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+  private val fileTableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )
+
+  test("Single CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableData = Seq(createTableData("src_csv"))
+      )
+    }
+  }
+
+  test("Single ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableData = Seq(createTableData("src_orc"))
+      )
+    }
+  }
+
+  test("Relation inside an EXISTS subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside an IN subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside a nested subquery expression") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = """
+          SELECT
+            col1 + (
+              SELECT 35 * (
+                SELECT key FROM src LIMIT 1
+              ) * col1 FROM VALUES (2)
+            )
+          FROM
+            VALUES (1)
+          """,
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation from a file") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkResolveLogicalRelation(
+        sqlText = s"select id from json.`${f.getCanonicalPath}`",
+        expectedTableData = Seq(
+          RelationId(
+            multipartIdentifier = Seq("spark_catalog", "json", 
s"${f.getCanonicalPath}")
+          ) -> TestTableData(
+            name = s"file:${f.getCanonicalPath}",
+            schema = fileTableSchema
+          )
+        )
+      )
+    })
+  }
+
+  test("Relation bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+      analyzerBridgeState.relationsWithResolvedMetadata.put(
+        UnresolvedRelation(Seq("src")),
+        createUnresolvedCatalogRelation("src")
+      )
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq(createTableData("src")),
+        analyzerBridgeState = Some(analyzerBridgeState)
+      )
+    }
+  }
+
+  test("Relation not bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq.empty,
+        analyzerBridgeState = Some(new AnalyzerBridgeState)
+      )
+    }
+  }
+
+  private def checkResolveUnresolvedCatalogRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>
+        
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString,
+      (relation) => 
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema,
+      analyzerBridgeState
+    )
+  }
+
+  private def checkResolveLogicalRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>
+        relation
+          .asInstanceOf[LogicalRelation]
+          .relation
+          .asInstanceOf[HadoopFsRelation]
+          .location
+          .rootPaths
+          .mkString(","),
+      (relation) => relation.asInstanceOf[LogicalRelation].relation.schema,

Review Comment:
   ```suggestion
         relation => relation.asInstanceOf[LogicalRelation].relation.schema,
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  AnalyzerBridgeState,
+  BridgedRelationMetadataProvider,
+  MetadataResolver,
+  RelationId,
+  Resolver
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.{FileResolver, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class MetadataResolverSuite extends QueryTest with SharedSparkSession with 
SQLTestUtils {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+  private val fileTableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )
+
+  test("Single CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableData = Seq(createTableData("src_csv"))
+      )
+    }
+  }
+
+  test("Single ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableData = Seq(createTableData("src_orc"))
+      )
+    }
+  }
+
+  test("Relation inside an EXISTS subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside an IN subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside a nested subquery expression") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = """
+          SELECT
+            col1 + (
+              SELECT 35 * (
+                SELECT key FROM src LIMIT 1
+              ) * col1 FROM VALUES (2)
+            )
+          FROM
+            VALUES (1)
+          """,
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation from a file") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkResolveLogicalRelation(
+        sqlText = s"select id from json.`${f.getCanonicalPath}`",
+        expectedTableData = Seq(
+          RelationId(
+            multipartIdentifier = Seq("spark_catalog", "json", 
s"${f.getCanonicalPath}")
+          ) -> TestTableData(
+            name = s"file:${f.getCanonicalPath}",
+            schema = fileTableSchema
+          )
+        )
+      )
+    })
+  }
+
+  test("Relation bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+      analyzerBridgeState.relationsWithResolvedMetadata.put(
+        UnresolvedRelation(Seq("src")),
+        createUnresolvedCatalogRelation("src")
+      )
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq(createTableData("src")),
+        analyzerBridgeState = Some(analyzerBridgeState)
+      )
+    }
+  }
+
+  test("Relation not bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq.empty,
+        analyzerBridgeState = Some(new AnalyzerBridgeState)
+      )
+    }
+  }
+
+  private def checkResolveUnresolvedCatalogRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>
+        
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString,
+      (relation) => 
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema,

Review Comment:
   ```suggestion
         relation => 
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema,
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  AnalyzerBridgeState,
+  BridgedRelationMetadataProvider,
+  MetadataResolver,
+  RelationId,
+  Resolver
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.{FileResolver, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class MetadataResolverSuite extends QueryTest with SharedSparkSession with 
SQLTestUtils {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+  private val fileTableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )
+
+  test("Single CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableData = Seq(createTableData("src_csv"))
+      )
+    }
+  }
+
+  test("Single ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableData = Seq(createTableData("src_orc"))
+      )
+    }
+  }
+
+  test("Relation inside an EXISTS subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside an IN subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside a nested subquery expression") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = """
+          SELECT
+            col1 + (
+              SELECT 35 * (
+                SELECT key FROM src LIMIT 1
+              ) * col1 FROM VALUES (2)
+            )
+          FROM
+            VALUES (1)
+          """,
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation from a file") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkResolveLogicalRelation(
+        sqlText = s"select id from json.`${f.getCanonicalPath}`",
+        expectedTableData = Seq(
+          RelationId(
+            multipartIdentifier = Seq("spark_catalog", "json", 
s"${f.getCanonicalPath}")
+          ) -> TestTableData(
+            name = s"file:${f.getCanonicalPath}",
+            schema = fileTableSchema
+          )
+        )
+      )
+    })
+  }
+
+  test("Relation bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+      analyzerBridgeState.relationsWithResolvedMetadata.put(
+        UnresolvedRelation(Seq("src")),
+        createUnresolvedCatalogRelation("src")
+      )
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq(createTableData("src")),
+        analyzerBridgeState = Some(analyzerBridgeState)
+      )
+    }
+  }
+
+  test("Relation not bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState

Review Comment:
   Is it used somewhere? If not, please, remove it.



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverGuardSuite.scala:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver' 



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver'



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala:
##########
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver' 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileResolverSuite.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+
+class FileResolverSuite extends QueryTest with SharedSparkSession {
+  private val tableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )

Review Comment:
   `nullable` is true by default:
   ```suggestion
     private val tableSchema = new StructType().add("id", LongType)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceResolverSuite.scala:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{MetadataResolver, 
Resolver}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+
+class DataSourceResolverSuite extends QueryTest with SharedSparkSession {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+
+  test("CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveOperator(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableName = "spark_catalog.default.src_csv",
+        expectedTableSchema = keyValueTableSchema
+      )
+    }
+  }
+
+  test("JSON relation") {
+    withTable("src_json") {
+      spark.sql("CREATE TABLE src_json (key INT, value STRING) USING 
JSON;").collect()
+
+      checkResolveOperator(
+        sqlText = "SELECT * FROM src_json",
+        expectedTableName = "spark_catalog.default.src_json",
+        expectedTableSchema = keyValueTableSchema
+      )
+    }
+  }
+
+  test("PARQUET relation") {
+    withTable("src_parquet") {
+      spark.sql("CREATE TABLE src_parquet (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveOperator(
+        sqlText = "SELECT * FROM src_parquet",
+        expectedTableName = "spark_catalog.default.src_parquet",
+        expectedTableSchema = keyValueTableSchema
+      )
+    }
+  }
+
+  test("ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveOperator(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableName = "spark_catalog.default.src_orc",
+        expectedTableSchema = keyValueTableSchema
+      )
+    }
+  }
+
+  private def checkResolveOperator(
+      sqlText: String,
+      expectedTableName: String,
+      expectedTableSchema: StructType) = {
+    val metadataResolver = new MetadataResolver(
+      spark.sessionState.catalogManager,
+      Resolver.createRelationResolution(spark.sessionState.catalogManager)
+    )
+    val dataSourceResolver = new DataSourceResolver(spark)
+
+    val unresolvedPlan = spark.sql(sqlText).queryExecution.logical
+
+    metadataResolver.resolve(unresolvedPlan)
+
+    val unresolvedRelations = unresolvedPlan.collect {
+      case unresolvedRelation: UnresolvedRelation => unresolvedRelation
+    }
+    assert(unresolvedRelations.size == 1)
+
+    val partiallyResolvedRelation = metadataResolver
+      .getRelationWithResolvedMetadata(unresolvedRelations.head)
+      .get
+      .asInstanceOf[SubqueryAlias]
+      .child
+    assert(partiallyResolvedRelation.isInstanceOf[UnresolvedCatalogRelation])
+
+    var result = dataSourceResolver.resolveOperator(partiallyResolvedRelation)

Review Comment:
   ```suggestion
       val result = 
dataSourceResolver.resolveOperator(partiallyResolvedRelation)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver' 



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverSuite.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.resolver.{Resolver, 
ResolverExtension}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.IntegerType
+
+class ResolverSuite extends QueryTest with SharedSparkSession {
+  private val col1Integer = AttributeReference("col1", IntegerType)()
+
+  test("Node matched the extension") {
+    val resolver = createResolver(
+      Seq(
+        new NoopResolver,
+        new TestRelationResolver
+      )
+    )
+
+    val result = resolver.lookupMetadataAndResolve(
+      Project(
+        Seq(UnresolvedAttribute("col1")),
+        TestRelation(resolutionDone = false, output = Seq(col1Integer))
+      )
+    )
+    assert(
+      result == Project(
+        Seq(col1Integer),
+        TestRelation(resolutionDone = true, output = Seq(col1Integer))
+      )
+    )
+  }
+
+  test("Node didn't match the extension") {
+    val resolver = createResolver(
+      Seq(
+        new NoopResolver,
+        new TestRelationResolver
+      )
+    )
+
+    checkError(
+      exception = intercept[AnalysisException](
+        resolver.lookupMetadataAndResolve(
+          Project(
+            Seq(UnresolvedAttribute("col1")),
+            UnknownRelation(output = Seq(col1Integer))
+          )
+        )
+      ),
+      condition = "UNSUPPORTED_SINGLE_PASS_ANALYZER_FEATURE",
+      parameters = Map(
+        "feature" -> "class org.apache.spark.sql.ResolverSuite$UnknownRelation 
operator resolution"
+      )
+    )
+  }
+
+  test("Ambiguous extensions") {
+    val resolver = createResolver(
+      Seq(
+        new NoopResolver,
+        new TestRelationResolver,
+        new TestRelationBrokenResolver
+      )
+    )
+
+    checkError(
+      exception = intercept[AnalysisException](
+        resolver.lookupMetadataAndResolve(
+          Project(
+            Seq(UnresolvedAttribute("col1")),
+            TestRelation(resolutionDone = false, output = Seq(col1Integer))
+          )
+        )
+      ),
+      condition = "AMBIGUOUS_RESOLVER_EXTENSION",
+      parameters = Map(
+        "operator" -> "org.apache.spark.sql.ResolverSuite$TestRelation",
+        "extensions" -> "TestRelationResolver, TestRelationBrokenResolver"
+      )
+    )
+  }
+
+  private def createResolver(extensions: Seq[ResolverExtension] = Seq.empty): 
Resolver = {
+    new Resolver(spark.sessionState.catalogManager, extensions)
+  }
+
+  private class TestRelationResolver extends ResolverExtension {
+    var timesCalled = 0
+
+    override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = {
+      case testNode: TestRelation if countTimesCalled() =>
+        testNode.copy(resolutionDone = true)
+    }
+
+    private def countTimesCalled(): Boolean = {
+      timesCalled += 1
+      assert(timesCalled == 1)
+      true
+    }
+  }
+
+  private class TestRelationBrokenResolver extends ResolverExtension {
+    override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = {
+      case testNode: TestRelation =>
+        assert(false)
+        testNode
+    }
+  }
+
+  private class NoopResolver extends ResolverExtension {
+    override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = {
+      case node: LogicalPlan if false =>
+        assert(false)
+        node
+    }
+  }
+
+  private case class TestRelation(
+      val resolutionDone: Boolean,

Review Comment:
   Do you access to this from somewhere? It seems `val` is not needed:
   ```suggestion
         resolutionDone: Boolean,
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ExplicitlyUnsupportedResolverFeatureSuite.scala:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql

Review Comment:
   Package name 'org.apache.spark.sql' does not correspond to the file path 
'org.apache.spark.sql.analysis.resolver'



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  AnalyzerBridgeState,
+  BridgedRelationMetadataProvider,
+  MetadataResolver,
+  RelationId,
+  Resolver
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.{FileResolver, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class MetadataResolverSuite extends QueryTest with SharedSparkSession with 
SQLTestUtils {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+  private val fileTableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )
+
+  test("Single CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableData = Seq(createTableData("src_csv"))
+      )
+    }
+  }
+
+  test("Single ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableData = Seq(createTableData("src_orc"))
+      )
+    }
+  }
+
+  test("Relation inside an EXISTS subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside an IN subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside a nested subquery expression") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = """
+          SELECT
+            col1 + (
+              SELECT 35 * (
+                SELECT key FROM src LIMIT 1
+              ) * col1 FROM VALUES (2)
+            )
+          FROM
+            VALUES (1)
+          """,
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation from a file") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkResolveLogicalRelation(
+        sqlText = s"select id from json.`${f.getCanonicalPath}`",
+        expectedTableData = Seq(
+          RelationId(
+            multipartIdentifier = Seq("spark_catalog", "json", 
s"${f.getCanonicalPath}")
+          ) -> TestTableData(
+            name = s"file:${f.getCanonicalPath}",
+            schema = fileTableSchema
+          )
+        )
+      )
+    })
+  }
+
+  test("Relation bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+      analyzerBridgeState.relationsWithResolvedMetadata.put(
+        UnresolvedRelation(Seq("src")),
+        createUnresolvedCatalogRelation("src")
+      )
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq(createTableData("src")),
+        analyzerBridgeState = Some(analyzerBridgeState)
+      )
+    }
+  }
+
+  test("Relation not bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq.empty,
+        analyzerBridgeState = Some(new AnalyzerBridgeState)
+      )
+    }
+  }
+
+  private def checkResolveUnresolvedCatalogRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>

Review Comment:
   ```suggestion
         relation =>
   ```



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/DataSourceWithHiveResolver.scala:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{DataSourceResolver, 
LogicalRelation}
+
+/**
+ * [[DataSourceWithHiveResolver]] is a [[DataSourceResolver]] that 
additionally handles
+ * [[HiveTableRelation]] conversion using [[RelationConversions]].
+ */
+class DataSourceWithHiveResolver(sparkSession: SparkSession, hiveCatalog: 
HiveSessionCatalog)
+    extends DataSourceResolver(sparkSession) {
+  val relationConversions = new RelationConversions(hiveCatalog)

Review Comment:
   ```suggestion
     private val relationConversions = new RelationConversions(hiveCatalog)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  AnalyzerBridgeState,
+  BridgedRelationMetadataProvider,
+  MetadataResolver,
+  RelationId,
+  Resolver
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.{FileResolver, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class MetadataResolverSuite extends QueryTest with SharedSparkSession with 
SQLTestUtils {
+  private val keyValueTableSchema = StructType(
+    Seq(
+      StructField("key", IntegerType, true),
+      StructField("value", StringType, true)
+    )
+  )
+  private val fileTableSchema = StructType(
+    Seq(
+      StructField("id", LongType, true)
+    )
+  )
+
+  test("Single CSV relation") {
+    withTable("src_csv") {
+      spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING 
CSV;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_csv",
+        expectedTableData = Seq(createTableData("src_csv"))
+      )
+    }
+  }
+
+  test("Single ORC relation") {
+    withTable("src_orc") {
+      spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING 
ORC;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src_orc",
+        expectedTableData = Seq(createTableData("src_orc"))
+      )
+    }
+  }
+
+  test("Relation inside an EXISTS subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside an IN subquery") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM 
src)",
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation inside a nested subquery expression") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = """
+          SELECT
+            col1 + (
+              SELECT 35 * (
+                SELECT key FROM src LIMIT 1
+              ) * col1 FROM VALUES (2)
+            )
+          FROM
+            VALUES (1)
+          """,
+        expectedTableData = Seq(createTableData("src"))
+      )
+    }
+  }
+
+  test("Relation from a file") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkResolveLogicalRelation(
+        sqlText = s"select id from json.`${f.getCanonicalPath}`",
+        expectedTableData = Seq(
+          RelationId(
+            multipartIdentifier = Seq("spark_catalog", "json", 
s"${f.getCanonicalPath}")
+          ) -> TestTableData(
+            name = s"file:${f.getCanonicalPath}",
+            schema = fileTableSchema
+          )
+        )
+      )
+    })
+  }
+
+  test("Relation bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+      analyzerBridgeState.relationsWithResolvedMetadata.put(
+        UnresolvedRelation(Seq("src")),
+        createUnresolvedCatalogRelation("src")
+      )
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq(createTableData("src")),
+        analyzerBridgeState = Some(analyzerBridgeState)
+      )
+    }
+  }
+
+  test("Relation not bridged from legacy Analyzer") {
+    withTable("src") {
+      spark.sql("CREATE TABLE src (key INT, value STRING) USING 
PARQUET;").collect()
+
+      val analyzerBridgeState = new AnalyzerBridgeState
+
+      checkResolveUnresolvedCatalogRelation(
+        sqlText = "SELECT * FROM src",
+        expectedTableData = Seq.empty,
+        analyzerBridgeState = Some(new AnalyzerBridgeState)
+      )
+    }
+  }
+
+  private def checkResolveUnresolvedCatalogRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>
+        
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString,
+      (relation) => 
relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema,
+      analyzerBridgeState
+    )
+  }
+
+  private def checkResolveLogicalRelation(
+      sqlText: String,
+      expectedTableData: Seq[(RelationId, TestTableData)],
+      analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = {
+    checkResolve(
+      sqlText,
+      expectedTableData,
+      (relation) =>

Review Comment:
   ```suggestion
         relation =>
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolver.scala:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+  ExplicitlyUnsupportedResolverFeature,
+  ResolverExtension
+}
+import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+
+/**
+ * The [[DataSourceResolver]] is a [[Resolver]] extension that resolves nodes 
defined in the
+ * [[datasources]] package. We have it as an extension to avoid cyclic 
dependencies between
+ * [[resolver]] and [[datasources]] packages.
+ */
+class DataSourceResolver(sparkSession: SparkSession) extends ResolverExtension 
{
+  val findDataSourceTable = new FindDataSourceTable(sparkSession)

Review Comment:
   could be a private
   ```suggestion
     private val findDataSourceTable = new FindDataSourceTable(sparkSession)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to