MaxGekk commented on code in PR #49029: URL: https://github.com/apache/spark/pull/49029#discussion_r1887233636
########## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala: ########## @@ -38,6 +39,7 @@ import org.apache.spark.sql.execution.command.CommandCheck import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck import org.apache.spark.sql.execution.streaming.ResolveWriteToStream +import org.apache.spark.sql.hive.DataSourceWithHiveResolver Review Comment: not needed as the same package ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/DataSourceWithHiveResolver.scala: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{DataSourceResolver, LogicalRelation} + +/** + * [[DataSourceWithHiveResolver]] is a [[DataSourceResolver]] that additionally handles + * [[HiveTableRelation]] conversion using [[RelationConversions]]. + */ +class DataSourceWithHiveResolver(sparkSession: SparkSession, hiveCatalog: HiveSessionCatalog) + extends DataSourceResolver(sparkSession) { + val relationConversions = new RelationConversions(hiveCatalog) Review Comment: `new` is not necessary since `RelationConversions` is a case class. ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverSuite.scala: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{ + AnalyzerBridgeState, + BridgedRelationMetadataProvider, + MetadataResolver, + RelationId, + Resolver +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileResolver, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class MetadataResolverSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + private val fileTableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) + + test("Single CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_csv", + expectedTableData = Seq(createTableData("src_csv")) + ) + } + } + + test("Single ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_orc", + expectedTableData = Seq(createTableData("src_orc")) + ) + } + } + + test("Relation inside an EXISTS subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside an IN subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside a nested subquery expression") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = """ + SELECT + col1 + ( + SELECT 35 * ( + SELECT key FROM src LIMIT 1 + ) * col1 FROM VALUES (2) + ) + FROM + VALUES (1) + """, + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation from a file") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkResolveLogicalRelation( + sqlText = s"select id from json.`${f.getCanonicalPath}`", + expectedTableData = Seq( + RelationId( + multipartIdentifier = Seq("spark_catalog", "json", s"${f.getCanonicalPath}") + ) -> TestTableData( + name = s"file:${f.getCanonicalPath}", + schema = fileTableSchema + ) + ) + ) + }) + } + + test("Relation bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + analyzerBridgeState.relationsWithResolvedMetadata.put( + UnresolvedRelation(Seq("src")), + createUnresolvedCatalogRelation("src") + ) + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq(createTableData("src")), + analyzerBridgeState = Some(analyzerBridgeState) + ) + } + } + + test("Relation not bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq.empty, + analyzerBridgeState = Some(new AnalyzerBridgeState) + ) + } + } + + private def checkResolveUnresolvedCatalogRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => + relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString, + (relation) => relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema, + analyzerBridgeState + ) + } + + private def checkResolveLogicalRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => + relation + .asInstanceOf[LogicalRelation] + .relation + .asInstanceOf[HadoopFsRelation] + .location + .rootPaths + .mkString(","), + (relation) => relation.asInstanceOf[LogicalRelation].relation.schema, Review Comment: ```suggestion relation => relation.asInstanceOf[LogicalRelation].relation.schema, ``` ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{ + AnalyzerBridgeState, + BridgedRelationMetadataProvider, + MetadataResolver, + RelationId, + Resolver +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileResolver, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class MetadataResolverSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + private val fileTableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) + + test("Single CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_csv", + expectedTableData = Seq(createTableData("src_csv")) + ) + } + } + + test("Single ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_orc", + expectedTableData = Seq(createTableData("src_orc")) + ) + } + } + + test("Relation inside an EXISTS subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside an IN subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside a nested subquery expression") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = """ + SELECT + col1 + ( + SELECT 35 * ( + SELECT key FROM src LIMIT 1 + ) * col1 FROM VALUES (2) + ) + FROM + VALUES (1) + """, + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation from a file") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkResolveLogicalRelation( + sqlText = s"select id from json.`${f.getCanonicalPath}`", + expectedTableData = Seq( + RelationId( + multipartIdentifier = Seq("spark_catalog", "json", s"${f.getCanonicalPath}") + ) -> TestTableData( + name = s"file:${f.getCanonicalPath}", + schema = fileTableSchema + ) + ) + ) + }) + } + + test("Relation bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + analyzerBridgeState.relationsWithResolvedMetadata.put( + UnresolvedRelation(Seq("src")), + createUnresolvedCatalogRelation("src") + ) + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq(createTableData("src")), + analyzerBridgeState = Some(analyzerBridgeState) + ) + } + } + + test("Relation not bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq.empty, + analyzerBridgeState = Some(new AnalyzerBridgeState) + ) + } + } + + private def checkResolveUnresolvedCatalogRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => + relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString, + (relation) => relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema, Review Comment: ```suggestion relation => relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema, ``` ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{ + AnalyzerBridgeState, + BridgedRelationMetadataProvider, + MetadataResolver, + RelationId, + Resolver +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileResolver, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class MetadataResolverSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + private val fileTableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) + + test("Single CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_csv", + expectedTableData = Seq(createTableData("src_csv")) + ) + } + } + + test("Single ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_orc", + expectedTableData = Seq(createTableData("src_orc")) + ) + } + } + + test("Relation inside an EXISTS subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside an IN subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside a nested subquery expression") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = """ + SELECT + col1 + ( + SELECT 35 * ( + SELECT key FROM src LIMIT 1 + ) * col1 FROM VALUES (2) + ) + FROM + VALUES (1) + """, + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation from a file") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkResolveLogicalRelation( + sqlText = s"select id from json.`${f.getCanonicalPath}`", + expectedTableData = Seq( + RelationId( + multipartIdentifier = Seq("spark_catalog", "json", s"${f.getCanonicalPath}") + ) -> TestTableData( + name = s"file:${f.getCanonicalPath}", + schema = fileTableSchema + ) + ) + ) + }) + } + + test("Relation bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + analyzerBridgeState.relationsWithResolvedMetadata.put( + UnresolvedRelation(Seq("src")), + createUnresolvedCatalogRelation("src") + ) + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq(createTableData("src")), + analyzerBridgeState = Some(analyzerBridgeState) + ) + } + } + + test("Relation not bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState Review Comment: Is it used somewhere? If not, please, remove it. ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverGuardSuite.scala: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala: ########## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala: ########## @@ -0,0 +1,658 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileResolverSuite.scala: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} + +class FileResolverSuite extends QueryTest with SharedSparkSession { + private val tableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) Review Comment: `nullable` is true by default: ```suggestion private val tableSchema = new StructType().add("id", LongType) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceResolverSuite.scala: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{MetadataResolver, Resolver} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +class DataSourceResolverSuite extends QueryTest with SharedSparkSession { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + + test("CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveOperator( + sqlText = "SELECT * FROM src_csv", + expectedTableName = "spark_catalog.default.src_csv", + expectedTableSchema = keyValueTableSchema + ) + } + } + + test("JSON relation") { + withTable("src_json") { + spark.sql("CREATE TABLE src_json (key INT, value STRING) USING JSON;").collect() + + checkResolveOperator( + sqlText = "SELECT * FROM src_json", + expectedTableName = "spark_catalog.default.src_json", + expectedTableSchema = keyValueTableSchema + ) + } + } + + test("PARQUET relation") { + withTable("src_parquet") { + spark.sql("CREATE TABLE src_parquet (key INT, value STRING) USING PARQUET;").collect() + + checkResolveOperator( + sqlText = "SELECT * FROM src_parquet", + expectedTableName = "spark_catalog.default.src_parquet", + expectedTableSchema = keyValueTableSchema + ) + } + } + + test("ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveOperator( + sqlText = "SELECT * FROM src_orc", + expectedTableName = "spark_catalog.default.src_orc", + expectedTableSchema = keyValueTableSchema + ) + } + } + + private def checkResolveOperator( + sqlText: String, + expectedTableName: String, + expectedTableSchema: StructType) = { + val metadataResolver = new MetadataResolver( + spark.sessionState.catalogManager, + Resolver.createRelationResolution(spark.sessionState.catalogManager) + ) + val dataSourceResolver = new DataSourceResolver(spark) + + val unresolvedPlan = spark.sql(sqlText).queryExecution.logical + + metadataResolver.resolve(unresolvedPlan) + + val unresolvedRelations = unresolvedPlan.collect { + case unresolvedRelation: UnresolvedRelation => unresolvedRelation + } + assert(unresolvedRelations.size == 1) + + val partiallyResolvedRelation = metadataResolver + .getRelationWithResolvedMetadata(unresolvedRelations.head) + .get + .asInstanceOf[SubqueryAlias] + .child + assert(partiallyResolvedRelation.isInstanceOf[UnresolvedCatalogRelation]) + + var result = dataSourceResolver.resolveOperator(partiallyResolvedRelation) Review Comment: ```suggestion val result = dataSourceResolver.resolveOperator(partiallyResolvedRelation) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ResolverSuite.scala: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.analysis.resolver.{Resolver, ResolverExtension} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.IntegerType + +class ResolverSuite extends QueryTest with SharedSparkSession { + private val col1Integer = AttributeReference("col1", IntegerType)() + + test("Node matched the extension") { + val resolver = createResolver( + Seq( + new NoopResolver, + new TestRelationResolver + ) + ) + + val result = resolver.lookupMetadataAndResolve( + Project( + Seq(UnresolvedAttribute("col1")), + TestRelation(resolutionDone = false, output = Seq(col1Integer)) + ) + ) + assert( + result == Project( + Seq(col1Integer), + TestRelation(resolutionDone = true, output = Seq(col1Integer)) + ) + ) + } + + test("Node didn't match the extension") { + val resolver = createResolver( + Seq( + new NoopResolver, + new TestRelationResolver + ) + ) + + checkError( + exception = intercept[AnalysisException]( + resolver.lookupMetadataAndResolve( + Project( + Seq(UnresolvedAttribute("col1")), + UnknownRelation(output = Seq(col1Integer)) + ) + ) + ), + condition = "UNSUPPORTED_SINGLE_PASS_ANALYZER_FEATURE", + parameters = Map( + "feature" -> "class org.apache.spark.sql.ResolverSuite$UnknownRelation operator resolution" + ) + ) + } + + test("Ambiguous extensions") { + val resolver = createResolver( + Seq( + new NoopResolver, + new TestRelationResolver, + new TestRelationBrokenResolver + ) + ) + + checkError( + exception = intercept[AnalysisException]( + resolver.lookupMetadataAndResolve( + Project( + Seq(UnresolvedAttribute("col1")), + TestRelation(resolutionDone = false, output = Seq(col1Integer)) + ) + ) + ), + condition = "AMBIGUOUS_RESOLVER_EXTENSION", + parameters = Map( + "operator" -> "org.apache.spark.sql.ResolverSuite$TestRelation", + "extensions" -> "TestRelationResolver, TestRelationBrokenResolver" + ) + ) + } + + private def createResolver(extensions: Seq[ResolverExtension] = Seq.empty): Resolver = { + new Resolver(spark.sessionState.catalogManager, extensions) + } + + private class TestRelationResolver extends ResolverExtension { + var timesCalled = 0 + + override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = { + case testNode: TestRelation if countTimesCalled() => + testNode.copy(resolutionDone = true) + } + + private def countTimesCalled(): Boolean = { + timesCalled += 1 + assert(timesCalled == 1) + true + } + } + + private class TestRelationBrokenResolver extends ResolverExtension { + override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = { + case testNode: TestRelation => + assert(false) + testNode + } + } + + private class NoopResolver extends ResolverExtension { + override def resolveOperator: PartialFunction[LogicalPlan, LogicalPlan] = { + case node: LogicalPlan if false => + assert(false) + node + } + } + + private case class TestRelation( + val resolutionDone: Boolean, Review Comment: Do you access to this from somewhere? It seems `val` is not needed: ```suggestion resolutionDone: Boolean, ``` ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ExplicitlyUnsupportedResolverFeatureSuite.scala: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql Review Comment: Package name 'org.apache.spark.sql' does not correspond to the file path 'org.apache.spark.sql.analysis.resolver' ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{ + AnalyzerBridgeState, + BridgedRelationMetadataProvider, + MetadataResolver, + RelationId, + Resolver +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileResolver, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class MetadataResolverSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + private val fileTableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) + + test("Single CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_csv", + expectedTableData = Seq(createTableData("src_csv")) + ) + } + } + + test("Single ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_orc", + expectedTableData = Seq(createTableData("src_orc")) + ) + } + } + + test("Relation inside an EXISTS subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside an IN subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside a nested subquery expression") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = """ + SELECT + col1 + ( + SELECT 35 * ( + SELECT key FROM src LIMIT 1 + ) * col1 FROM VALUES (2) + ) + FROM + VALUES (1) + """, + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation from a file") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkResolveLogicalRelation( + sqlText = s"select id from json.`${f.getCanonicalPath}`", + expectedTableData = Seq( + RelationId( + multipartIdentifier = Seq("spark_catalog", "json", s"${f.getCanonicalPath}") + ) -> TestTableData( + name = s"file:${f.getCanonicalPath}", + schema = fileTableSchema + ) + ) + ) + }) + } + + test("Relation bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + analyzerBridgeState.relationsWithResolvedMetadata.put( + UnresolvedRelation(Seq("src")), + createUnresolvedCatalogRelation("src") + ) + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq(createTableData("src")), + analyzerBridgeState = Some(analyzerBridgeState) + ) + } + } + + test("Relation not bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq.empty, + analyzerBridgeState = Some(new AnalyzerBridgeState) + ) + } + } + + private def checkResolveUnresolvedCatalogRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => Review Comment: ```suggestion relation => ``` ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/DataSourceWithHiveResolver.scala: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{DataSourceResolver, LogicalRelation} + +/** + * [[DataSourceWithHiveResolver]] is a [[DataSourceResolver]] that additionally handles + * [[HiveTableRelation]] conversion using [[RelationConversions]]. + */ +class DataSourceWithHiveResolver(sparkSession: SparkSession, hiveCatalog: HiveSessionCatalog) + extends DataSourceResolver(sparkSession) { + val relationConversions = new RelationConversions(hiveCatalog) Review Comment: ```suggestion private val relationConversions = new RelationConversions(hiveCatalog) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/MetadataResolverSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.resolver.{ + AnalyzerBridgeState, + BridgedRelationMetadataProvider, + MetadataResolver, + RelationId, + Resolver +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileResolver, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class MetadataResolverSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + private val keyValueTableSchema = StructType( + Seq( + StructField("key", IntegerType, true), + StructField("value", StringType, true) + ) + ) + private val fileTableSchema = StructType( + Seq( + StructField("id", LongType, true) + ) + ) + + test("Single CSV relation") { + withTable("src_csv") { + spark.sql("CREATE TABLE src_csv (key INT, value STRING) USING CSV;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_csv", + expectedTableData = Seq(createTableData("src_csv")) + ) + } + } + + test("Single ORC relation") { + withTable("src_orc") { + spark.sql("CREATE TABLE src_orc (key INT, value STRING) USING ORC;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src_orc", + expectedTableData = Seq(createTableData("src_orc")) + ) + } + } + + test("Relation inside an EXISTS subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE EXISTS (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside an IN subquery") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM VALUES (1) WHERE col1 IN (SELECT col1 FROM src)", + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation inside a nested subquery expression") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + checkResolveUnresolvedCatalogRelation( + sqlText = """ + SELECT + col1 + ( + SELECT 35 * ( + SELECT key FROM src LIMIT 1 + ) * col1 FROM VALUES (2) + ) + FROM + VALUES (1) + """, + expectedTableData = Seq(createTableData("src")) + ) + } + } + + test("Relation from a file") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkResolveLogicalRelation( + sqlText = s"select id from json.`${f.getCanonicalPath}`", + expectedTableData = Seq( + RelationId( + multipartIdentifier = Seq("spark_catalog", "json", s"${f.getCanonicalPath}") + ) -> TestTableData( + name = s"file:${f.getCanonicalPath}", + schema = fileTableSchema + ) + ) + ) + }) + } + + test("Relation bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + analyzerBridgeState.relationsWithResolvedMetadata.put( + UnresolvedRelation(Seq("src")), + createUnresolvedCatalogRelation("src") + ) + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq(createTableData("src")), + analyzerBridgeState = Some(analyzerBridgeState) + ) + } + } + + test("Relation not bridged from legacy Analyzer") { + withTable("src") { + spark.sql("CREATE TABLE src (key INT, value STRING) USING PARQUET;").collect() + + val analyzerBridgeState = new AnalyzerBridgeState + + checkResolveUnresolvedCatalogRelation( + sqlText = "SELECT * FROM src", + expectedTableData = Seq.empty, + analyzerBridgeState = Some(new AnalyzerBridgeState) + ) + } + } + + private def checkResolveUnresolvedCatalogRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => + relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.identifier.unquotedString, + (relation) => relation.asInstanceOf[UnresolvedCatalogRelation].tableMeta.schema, + analyzerBridgeState + ) + } + + private def checkResolveLogicalRelation( + sqlText: String, + expectedTableData: Seq[(RelationId, TestTableData)], + analyzerBridgeState: Option[AnalyzerBridgeState] = None): Unit = { + checkResolve( + sqlText, + expectedTableData, + (relation) => Review Comment: ```suggestion relation => ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolver.scala: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.resolver.{ + ExplicitlyUnsupportedResolverFeature, + ResolverExtension +} +import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.execution.streaming.StreamingRelation + +/** + * The [[DataSourceResolver]] is a [[Resolver]] extension that resolves nodes defined in the + * [[datasources]] package. We have it as an extension to avoid cyclic dependencies between + * [[resolver]] and [[datasources]] packages. + */ +class DataSourceResolver(sparkSession: SparkSession) extends ResolverExtension { + val findDataSourceTable = new FindDataSourceTable(sparkSession) Review Comment: could be a private ```suggestion private val findDataSourceTable = new FindDataSourceTable(sparkSession) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
