vladimirg-db commented on code in PR #49029:
URL: https://github.com/apache/spark/pull/49029#discussion_r1873712410


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import org.apache.spark.sql.catalyst.{QueryPlanningTracker, SQLConfHelper}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, Analyzer}
+import org.apache.spark.sql.catalyst.plans.NormalizePlan
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, 
LogicalPlan}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * The HybridAnalyzer routes the unresolved logical plan between the legacy 
Analyzer and
+ * a single-pass Analyzer when the query that we are processing is being run 
from unit tests
+ * depending on the testing flags set and the structure of this unresolved 
logical plan:
+ *   - If the "spark.sql.analyzer.singlePassResolver.soloRunEnabled" is 
"true", the
+ *      [[HybridAnalyzer]] will unconditionally run the single-pass Analyzer, 
which would
+ *      usually result in some unexpected behavior and failures. This flag is 
used only for
+ *      development.
+ *   - If the "spark.sql.analyzer.singlePassResolver.dualRunEnabled" is 
"true", the
+ *      [[HybridAnalyzer]] will invoke the legacy analyzer and optionally 
_also_ the fixed-point
+ *      one depending on the structure of the unresolved plan. This decision 
is based on which
+ *      features are supported by the single-pass Analyzer, and the checking 
is implemented in
+ *      the [[ResolverGuard]]. After that we validate the results using the 
following
+ *      logic:
+ *        - If the fixed-point Analyzer fails and the single-pass one 
succeeds, we throw an
+ *          appropriate exception (please check the
+ *          [[QueryCompilationErrors.fixedPointFailedSinglePassSucceeded]] 
method)
+ *        - If both the fixed-point and the single-pass Analyzers failed, we 
throw the exception
+ *          from the fixed-point Analyzer.
+ *        - If the single-pass Analyzer failed, we throw an exception from its 
failure.
+ *        - If both the fixed-point and the single-pass Analyzers succeeded, 
we compare the logical
+ *          plans and output schemas, and return the resolved plan from the 
fixed-point Analyzer.
+ *   - Otherwise we run the legacy analyzer.
+ * */
+class HybridAnalyzer(legacyAnalyzer: Analyzer, resolverGuard: ResolverGuard, 
resolver: Resolver)
+    extends SQLConfHelper {
+
+  def apply(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
+    val dualRun =
+      conf.getConf(SQLConf.ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER) 
&&
+      resolverGuard.apply(plan)
+
+    withTrackedAnalyzerBridgeState(dualRun) {
+      if (dualRun) {
+        resolveInDualRun(plan, tracker)
+      } else if (conf.getConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED)) {
+        resolveInSinglePass(plan)
+      } else {
+        resolveInFixedPoint(plan, tracker)
+      }
+    }
+  }
+
+  /**
+   * Call `body` in the context of tracked [[AnalyzerBridgeState]]. Set the 
new bridge state
+   * depending on whether we are in dual-run mode or not:
+   * - If [[dualRun]] is true, create and set a new [[AnalyzerBridgeState]].
+   * - Otherwise, reset [[AnalyzerBridgeState]].
+   *
+   * Finally, set the bridge state back to the previous one after the `body` 
is executed to avoid
+   * disrupting the possible upper-level [[Analyzer]] invocation in case it's 
recursive
+   * [[Analyzer]] call.
+   * */
+  private def withTrackedAnalyzerBridgeState(dualRun: Boolean)(
+      body: => LogicalPlan): LogicalPlan = {
+    val prevSinglePassResolverBridgeState = 
AnalysisContext.get.getSinglePassResolverBridgeState
+
+    AnalysisContext.get.setSinglePassResolverBridgeState(if (dualRun) {
+      Some(new AnalyzerBridgeState)
+    } else {
+      None
+    })
+
+    try {
+      body
+    } finally {
+      
AnalysisContext.get.setSinglePassResolverBridgeState(prevSinglePassResolverBridgeState)
+    }
+  }
+
+  /**
+   * This method is used to run both the legacy Analyzer and single-pass 
Analyzer,
+   * and then compare the results or check the errors. For more context please 
check the
+   * [[HybridAnalyzer]] scaladoc.
+   * */
+  private def resolveInDualRun(plan: LogicalPlan, tracker: 
QueryPlanningTracker): LogicalPlan = {
+    var fixedPointException: Option[Throwable] = None
+    val fixedPointResult = try {
+      Some(resolveInFixedPoint(plan, tracker))
+    } catch {
+      case e: Throwable =>

Review Comment:
   Good point, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to