MaxGekk commented on code in PR #49029: URL: https://github.com/apache/spark/pull/49029#discussion_r1870308778
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +import org.apache.spark.sql.catalyst.{QueryPlanningTracker, SQLConfHelper} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, Analyzer} +import org.apache.spark.sql.catalyst.plans.NormalizePlan +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * The HybridAnalyzer routes the unresolved logical plan between the legacy Analyzer and + * a single-pass Analyzer when the query that we are processing is being run from unit tests + * depending on the testing flags set and the structure of this unresolved logical plan: + * - If the "spark.sql.analyzer.singlePassResolver.soloRunEnabled" is "true", the + * [[HybridAnalyzer]] will unconditionally run the single-pass Analyzer, which would + * usually result in some unexpected behavior and failures. This flag is used only for + * development. + * - If the "spark.sql.analyzer.singlePassResolver.dualRunEnabled" is "true", the + * [[HybridAnalyzer]] will invoke the legacy analyzer and optionally _also_ the fixed-point + * one depending on the structure of the unresolved plan. This decision is based on which + * features are supported by the single-pass Analyzer, and the checking is implemented in + * the [[ResolverGuard]]. After that we validate the results using the following + * logic: + * - If the fixed-point Analyzer fails and the single-pass one succeeds, we throw an + * appropriate exception (please check the + * [[QueryCompilationErrors.fixedPointFailedSinglePassSucceeded]] method) + * - If both the fixed-point and the single-pass Analyzers failed, we throw the exception + * from the fixed-point Analyzer. + * - If the single-pass Analyzer failed, we throw an exception from its failure. + * - If both the fixed-point and the single-pass Analyzers succeeded, we compare the logical + * plans and output schemas, and return the resolved plan from the fixed-point Analyzer. + * - Otherwise we run the legacy analyzer. + * */ +class HybridAnalyzer(legacyAnalyzer: Analyzer, resolverGuard: ResolverGuard, resolver: Resolver) + extends SQLConfHelper { + + def apply(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { + val dualRun = + conf.getConf(SQLConf.ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER) && + resolverGuard.apply(plan) + + withTrackedAnalyzerBridgeState(dualRun) { + if (dualRun) { + resolveInDualRun(plan, tracker) + } else if (conf.getConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED)) { + resolveInSinglePass(plan) + } else { + resolveInFixedPoint(plan, tracker) + } + } + } + + /** + * Call `body` in the context of tracked [[AnalyzerBridgeState]]. Set the new bridge state + * depending on whether we are in dual-run mode or not: + * - If [[dualRun]] is true, create and set a new [[AnalyzerBridgeState]]. + * - Otherwise, reset [[AnalyzerBridgeState]]. + * + * Finally, set the bridge state back to the previous one after the `body` is executed to avoid + * disrupting the possible upper-level [[Analyzer]] invocation in case it's recursive + * [[Analyzer]] call. + * */ + private def withTrackedAnalyzerBridgeState(dualRun: Boolean)( + body: => LogicalPlan): LogicalPlan = { + val prevSinglePassResolverBridgeState = AnalysisContext.get.getSinglePassResolverBridgeState + + AnalysisContext.get.setSinglePassResolverBridgeState(if (dualRun) { + Some(new AnalyzerBridgeState) + } else { + None + }) + + try { + body + } finally { + AnalysisContext.get.setSinglePassResolverBridgeState(prevSinglePassResolverBridgeState) + } + } + + /** + * This method is used to run both the legacy Analyzer and single-pass Analyzer, + * and then compare the results or check the errors. For more context please check the + * [[HybridAnalyzer]] scaladoc. + * */ + private def resolveInDualRun(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { + var fixedPointException: Option[Throwable] = None + val fixedPointResult = try { + Some(resolveInFixedPoint(plan, tracker)) + } catch { + case e: Throwable => Review Comment: How about to catch only `NonFatal`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
