davidm-db commented on code in PR #53530:
URL: https://github.com/apache/spark/pull/53530#discussion_r2693848671


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -144,6 +148,22 @@ declareHandlerStatement
     : DECLARE (CONTINUE | EXIT) HANDLER FOR conditionValues 
(beginEndCompoundBlock | statement | setStatementInsideSqlScript)
     ;
 
+declareCursorStatement
+    : DECLARE name=errorCapturingIdentifier (ASENSITIVE | INSENSITIVE)? CURSOR 
FOR query (FOR READ ONLY)?

Review Comment:
   are we using `errorCapturingIdentifier` here and `multipartIdentifier` in 
other statements to forbid declarations with dots, or is there any additional 
meaning to it? 



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -144,6 +148,22 @@ declareHandlerStatement
     : DECLARE (CONTINUE | EXIT) HANDLER FOR conditionValues 
(beginEndCompoundBlock | statement | setStatementInsideSqlScript)
     ;
 
+declareCursorStatement
+    : DECLARE name=errorCapturingIdentifier (ASENSITIVE | INSENSITIVE)? CURSOR 
FOR query (FOR READ ONLY)?
+    ;
+
+openCursorStatement
+    : OPEN multipartIdentifier (USING (LEFT_PAREN params=namedExpressionSeq 
RIGHT_PAREN | params=namedExpressionSeq))?
+    ;
+
+fetchCursorStatement
+    : FETCH ((NEXT FROM) | FROM | NEXT)? cursorName=multipartIdentifier INTO 
targets=multipartIdentifierList

Review Comment:
   per ref spec it's `FETCH [ [ NEXT ] FROM ]` which maps to either:
   - `FETCH FROM` or
   - `FETCH NEXT FROM`
   
   here we have three options, so just double checking which one is expected?



##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala:
##########
@@ -53,6 +55,108 @@ class SqlScriptingExecutionContext extends 
SqlScriptingExecutionContextExtension
   def currentFrame: SqlScriptingExecutionFrame = frames.last
   def currentScope: SqlScriptingExecutionScope = currentFrame.currentScope
 
+  /**
+   * Find a cursor by its normalized name in the current scope and parent 
scopes.
+   * Implementation of SqlScriptingExecutionContextExtension API.
+   *
+   * Searches current frame first (respects shadowing), then script frame (for 
cross-frame access).
+   * This ensures cursors declared in handlers shadow cursors with the same 
name in the script.
+   */
+  override def findCursorByName(normalizedName: String): 
Option[CursorDefinition] = {

Review Comment:
   Do we understand the limitations of the current cursor resolution logic? The 
proper variable resolution logic is implemented in:
   - `VariableResolution#lookupVariable` and
   - `SqlScriptingLocalVariableManaged#get`
   and is lot more complex than this.
   
   I don't have an issue with going like this initially, but I think we need to 
know how we will address the issue with resolution. I would expect cursors to 
behave as any other variable, which at the moment isn't the case.
   
   This is especially important if/when we go down the way of adding similar 
constructs (like TableRef variables) - all of them would need to behave like 
variables, but they are not regular variables.
   If we do special casing like this for each of them, that would be bad. I 
think we need to create a mechanism to consider them as regular variables - 
i.e. special cases/classes that extend `VariableDefinition` maybe? This way we 
would have a unified mechanism for adding new types of var defs and it would 
fit into the variable resolution logic that exists today (hopefully with a very 
little amount of case matching).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{InternalRow, SqlScriptingContextManager}
+import org.apache.spark.sql.catalyst.catalog.VariableDefinition
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference, 
Expression, Literal, VariableReference}
+import org.apache.spark.sql.classic.Dataset
+import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.{CursorFetching, CursorOpened}
+
+/**
+ * Physical plan node for fetching from cursors.
+ *
+ * Transitions cursor from Opened to Fetching state on first fetch (creating 
result iterator),
+ * then fetches rows from the iterator on subsequent calls. Assigns fetched 
values to target
+ * variables with ANSI store assignment rules.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ * @param targetVariables Variables to fetch into
+ */
+case class FetchCursorExec(
+    cursor: Expression,
+    targetVariables: Seq[VariableReference]) extends LeafV2CommandExec with 
DataTypeErrorsBase {
+
+  override protected def run(): Seq[InternalRow] = {
+    // Extract CursorReference from the resolved cursor expression
+    val cursorRef = cursor.asInstanceOf[CursorReference]
+
+    val scriptingContext = 
CursorCommandUtils.getScriptingContext(cursorRef.definition.name)
+    val variableManager = 
SqlScriptingContextManager.get().get.getVariableManager
+
+    // Get current cursor state
+    val currentState = scriptingContext.getCursorState(cursorRef).getOrElse(
+      throw new AnalysisException(
+        errorClass = "CURSOR_NOT_FOUND",
+        messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name))))
+
+    // Get or create iterator based on current state
+    val (iterator, analyzedQuery) = currentState match {
+      case CursorOpened(query) =>
+        // First fetch - create iterator and transition to Fetching state
+        val df = Dataset.ofRows(
+          session.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
+          query)
+        val iter = df.toLocalIterator()
+        scriptingContext.updateCursorState(
+          cursorRef.normalizedName,
+          cursorRef.scopeLabel,
+          CursorFetching(query, iter))
+        (iter, query)
+
+      case CursorFetching(query, iter) =>
+        // Subsequent fetch - use existing iterator
+        (iter, query)
+
+      case _ =>
+        throw new AnalysisException(
+          errorClass = "CURSOR_NOT_OPEN",
+          messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name)))
+    }
+
+    // Get next row from iterator
+    if (!iterator.hasNext) {
+      throw new AnalysisException(
+        errorClass = "CURSOR_NO_MORE_ROWS",
+        messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name)))
+    }
+
+    val externalRow = iterator.next()
+
+    // Convert Row to InternalRow for processing
+    val schema = org.apache.spark.sql.catalyst.types.DataTypeUtils
+      .fromAttributes(analyzedQuery.output)
+    val converter = org.apache.spark.sql.catalyst.CatalystTypeConverters
+      .createToCatalystConverter(schema)
+    val currentRow = converter(externalRow).asInstanceOf[InternalRow]

Review Comment:
   I spent quite some time to figure this out and I think we might be (not sure 
how much?) inefficient here (similar holds for the `ForStatementExec` but there 
it is exposed in a bit different manner). cc @cloud-fan to help here, I'm not 
sure if I'm correct.
   
   TLDR: by calling `toLocalIterator` we are basically converting from 
`InternalRow` to `Row`, just to take it then and convert back to `InternalRow`:
   <img width="989" height="292" alt="image" 
src="https://github.com/user-attachments/assets/5fdf8b52-3839-4697-a3b3-2578d95e2c80";
 />
   
   And it looks like the `executeToIterator` is actually behind the 
implementation of `toLocalIterator`, so we do still preserve the 
partition-by-partition collection of results:
   <img width="849" height="421" alt="image" 
src="https://github.com/user-attachments/assets/f78540c1-6d35-4aae-97a3-b57c5e144f7f";
 />
   
   Similar thing happens in `ForStatementExec`, but there the we use the 
"public" values from `Row`, just to get them back to the `InternalRow` format 
through the `SetVariable` statement. There, we can avoid creating `SetVariable` 
statements in a similar manner we do here, by directly updating the variable 
values.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCursors.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.SqlScriptingContextManager
+import org.apache.spark.sql.catalyst.expressions.{CursorReference, 
UnresolvedCursor}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_CURSOR
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolves [[UnresolvedCursor]] expressions to [[CursorReference]] 
expressions.
+ * This rule:
+ * 1. Normalizes cursor names based on case sensitivity configuration
+ * 2. Separates qualified cursor names (label.cursor) into scope label and 
cursor name
+ * 3. Looks up the cursor definition from the scripting context
+ * 4. Fails early if cursor is not found
+ */
+class ResolveCursors extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressionsWithPruning(
+    _.containsPattern(UNRESOLVED_CURSOR)) {
+    case uc: UnresolvedCursor =>
+      resolveCursor(uc)
+  }
+
+  private def resolveCursor(uc: UnresolvedCursor): CursorReference = {
+    val nameParts = uc.nameParts
+    val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+    // Parser already validates this, so we can assert
+    assert(nameParts.length <= 2,
+      s"Cursor reference has too many parts: ${nameParts.mkString(".")}")
+
+    // Split qualified name into scope label and cursor name
+    val (scopeLabel, cursorName) = if (nameParts.length == 2) {
+      // Qualified cursor: "label.cursor"
+      (Some(nameParts.head), nameParts.last)
+    } else {
+      // Unqualified cursor: "cursor"
+      (None, nameParts.head)
+    }
+
+    // Normalize cursor name and scope label based on case sensitivity
+    val normalizedName = if (caseSensitive) {
+      cursorName
+    } else {
+      cursorName.toLowerCase(Locale.ROOT)
+    }
+
+    val normalizedScopeLabel = scopeLabel.map { label =>
+      if (caseSensitive) {
+        label
+      } else {
+        label.toLowerCase(Locale.ROOT)
+      }
+    }
+
+    // Look up cursor definition from scripting context using the extension API
+    val contextOpt = SqlScriptingContextManager.get().map(_.getContext)
+
+    val cursorDefOpt = contextOpt.flatMap { context =>
+      // Use the SqlScriptingExecutionContextExtension API for cursor lookup
+      normalizedScopeLabel match {
+        case Some(label) =>
+          // Qualified cursor: look up in specific labeled scope
+          context.findCursorInScope(label, normalizedName)
+        case None =>
+          // Unqualified cursor: search current and parent scopes
+          context.findCursorByName(normalizedName)
+      }
+    }
+
+    // If cursor not found and we're in a scripting context, fail immediately
+    if (contextOpt.isDefined && cursorDefOpt.isEmpty) {

Review Comment:
   nit: why do we need the `contextOpt.isDefined` when we allow cursors only in 
scripts? even if we allowed outside of scripts, the question would still be the 
same.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.VariableReference
+import org.apache.spark.sql.catalyst.plans.logical.{FetchCursor, LogicalPlan, 
SingleStatement}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import 
org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError
+
+/**
+ * Resolves the target SQL variables in FetchCursor command.
+ * Variables can be either scripting local variables or session variables.
+ */
+class ResolveFetchCursor(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan]
+  with ColumnResolutionHelper {
+  // VariableResolution looks up both scripting local variables (via 
SqlScriptingContextManager)
+  // and session variables (via tempVariableManager), checking local variables 
first.
+  private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
+
+  /**
+   * Checks for duplicate variable names and throws an exception if found.
+   * Names are normalized when the variables are created.
+   * No need for case insensitive comparison here.
+   */
+  private def checkForDuplicateVariables(variables: Seq[VariableReference]): 
Unit = {
+    val dups = variables.groupBy(_.identifier).filter(kv => kv._2.length > 1)
+    if (dups.nonEmpty) {
+      throw new AnalysisException(
+        errorClass = "DUPLICATE_ASSIGNMENTS",
+        messageParameters = Map("nameList" ->
+          dups.keys.map(key => toSQLId(key.name())).mkString(", ")))
+    }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    // Resolve FetchCursor wrapped in SingleStatement
+    case s @ SingleStatement(fetchCursor: FetchCursor)
+        if !fetchCursor.targetVariables.forall(_.resolved) =>
+      val resolvedVars = fetchCursor.targetVariables.map {
+        case u: UnresolvedAttribute =>
+          variableResolution.lookupVariable(
+            nameParts = u.nameParts
+          ) match {
+            case Some(variable) => variable.copy(canFold = false)
+            case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", 
"SESSION"))
+          }
+
+        case other => throw SparkException.internalError(
+          "Unexpected target variable expression in FetchCursor: " + other)
+      }
+
+      // Check for duplicates immediately after resolution
+      checkForDuplicateVariables(resolvedVars)

Review Comment:
   exactly the same as in the `case` below - let's at least create a helper 
function



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -1037,6 +1037,48 @@
     },
     "sqlState" : "21S01"
   },
+  "CURSOR_ALREADY_EXISTS" : {

Review Comment:
   Does it make sense to create a main "CURSOR" class and subclasses for all 
exceptions? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.{CursorClosed, CursorDeclared, 
CursorOpened}
+
+/**
+ * Physical plan node for opening cursors.
+ *
+ * Transitions cursor from Declared to Opened state by:
+ * 1. Parsing the cursor's SQL query text to a LogicalPlan
+ * 2. Binding parameters (if USING clause is provided)
+ * 3. Analyzing the query (semantic analysis, resolution, type checking)
+ *
+ * Does not execute the query or create result iterator - that happens on 
first FETCH.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ * @param args Parameter expressions from USING clause (for parameterized 
cursors)
+ * @param paramNames Names for each parameter (empty string for positional 
parameters)
+ */
+case class OpenCursorExec(

Review Comment:
   Similar to my earlier comment about "SELECT INTO" semantics, 
`OpenCursorExec` does have a lot of similarities/overlapping with 
`ExecuteImmediate`:
   - `USING` clause
   - how nested queries are executed
   
   I would like to see that unified into a trait or something similar. I think 
there's a huge value to it, because these specific parts of the code (related 
to variables, parameters, etc) have evolved a lot in the past period and it's 
easier to refactor now rather than later, especially considering it is obvious 
even now that it should happen.



##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala:
##########
@@ -314,6 +314,12 @@ class CompoundBodyExec(
   /** Used to stop the iteration in cases when LEAVE statement is encountered. 
*/
   private var stopIteration = false
 
+  /**
+   * Flag to prevent advancing curr after an exception was thrown and handled 
by a CONTINUE handler.
+   * When true, the next call to next() should return curr without advancing.
+   */
+  private[scripting] var returnCurrentWithoutAdvancing = false

Review Comment:
   I think I wouldn't be comfortable with the change about 
`returnCurrentWithoutAdvancing` going in and wouldn't like the PR to be merged 
before this is properly understood and resolved.
   
   The comments are a bit contradictory - for example, bolded parts here: "If 
we need to return current **without advancing** (after CONTINUE handler), do 
so, **advance curr**, and clear the flag". 
   The semantic of the change, from what I'm getting (but I don't think the 
logic is commented out well), is that we don't want the `curr` to advance after 
we exit the `CONTINUE HANDLER`, but we do it ourselves. The existing logic 
should also behave the expected way.
   
   If we have some bug, let's try to figure it out and address it properly. 
This seems like a LLM special casing something, but it's not clearly explained 
why and I don't understand it from looking at the code.
   
   Do we have an example of the query that fails without this change? Can you 
share it so we can look into it and figure out if there is any bug? Honestly 
the comments about this change sound like the behavior that should already 
exist, so I wouldn't expect we need this much special casing.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/DeclareCursorExec.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorDefinition}
+import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.CursorDeclared
+
+/**
+ * Physical plan node for declaring cursors.
+ *
+ * Creates a cursor definition and initializes it in the Declared state.
+ * The cursor query is stored as SQL text and is not parsed or analyzed until 
OPEN time.
+ * This allows parameter markers to be preserved and bound correctly at OPEN.
+ *
+ * @param cursorName Name of the cursor
+ * @param queryText Original SQL text of the cursor query (with parameter 
markers preserved)
+ * @param asensitive Whether the cursor is ASENSITIVE (sensitivity to 
underlying data changes,
+ *                   not case sensitivity). Currently all cursors are 
effectively INSENSITIVE.
+ */
+case class DeclareCursorExec(
+    cursorName: String,
+    queryText: String,
+    asensitive: Boolean) extends LeafV2CommandExec with DataTypeErrorsBase {
+
+  override protected def run(): Seq[InternalRow] = {
+    val scriptingContext = CursorCommandUtils.getScriptingContext(cursorName)
+    val currentScope = scriptingContext.currentScope
+
+    // Normalize cursor name based on case sensitivity configuration
+    val normalizedName = if (session.sessionState.conf.caseSensitiveAnalysis) {
+      cursorName
+    } else {
+      cursorName.toLowerCase(java.util.Locale.ROOT)
+    }
+
+    // Validate cursor doesn't already exist in current scope
+    if (currentScope.cursors.contains(normalizedName)) {
+      throw new AnalysisException(
+        errorClass = "CURSOR_ALREADY_EXISTS",
+        messageParameters = Map("cursorName" -> toSQLId(cursorName)))
+    }
+
+    // Create immutable cursor definition with name and SQL text
+    // Query parsing and analysis is deferred until OPEN time
+    val cursorDef = CursorDefinition(
+      name = cursorName,

Review Comment:
   I don't quite get the reason why we are mixing usage of `normalizedName` and 
`cursorName`? For example, IIRC variable definitions always use 
`normalizedName`. Why do we do it differently here?
   
   I see that @cloud-fan had some similar comments, but also don't understand 
how they were addressed.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -6581,6 +6583,120 @@ class AstBuilder extends DataTypeAstBuilder
     )
   }
 
+  /**
+   * Create a [[DeclareCursor]] command wrapped in SingleStatement.
+   *
+   * For example:
+   * {{{
+   *   DECLARE cursor_name CURSOR FOR SELECT * FROM table;
+   * }}}
+   */
+  override def visitDeclareCursorStatement(
+      ctx: DeclareCursorStatementContext): LogicalPlan = withOrigin(ctx) {
+    if (!conf.getConf(SQLConf.SQL_SCRIPTING_CURSOR_ENABLED)) {
+      throw SqlScriptingErrors.cursorNotSupported(CurrentOrigin.get)
+    }
+
+    val cursorName = getIdentifierText(ctx.name)
+    // Extract original SQL text to preserve parameter markers
+    val queryText = getOriginalText(ctx.query())
+
+    val asensitive = if (ctx.INSENSITIVE() != null) false else true
+    SingleStatement(DeclareCursor(cursorName, queryText, asensitive))

Review Comment:
   why do we wrap `DeclareCursor` into `SingleStatement` here? It should 
already be handled by `visitCompoundBodyImp` - I think this way we might end up 
with
   ```
   SingleStatement(SingleStatement(DeclareCursor, ...), ...)
   ```
   aside from it being unnecessary, not sure if there might be any downsides to 
it from the top of my mind.
   
   cc @miland-db to double check



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.{CursorClosed, CursorDeclared, 
CursorOpened}
+
+/**
+ * Physical plan node for opening cursors.
+ *
+ * Transitions cursor from Declared to Opened state by:
+ * 1. Parsing the cursor's SQL query text to a LogicalPlan
+ * 2. Binding parameters (if USING clause is provided)
+ * 3. Analyzing the query (semantic analysis, resolution, type checking)
+ *
+ * Does not execute the query or create result iterator - that happens on 
first FETCH.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ * @param args Parameter expressions from USING clause (for parameterized 
cursors)
+ * @param paramNames Names for each parameter (empty string for positional 
parameters)
+ */
+case class OpenCursorExec(
+    cursor: Expression,
+    args: Seq[Expression] = Seq.empty,
+    paramNames: Seq[String] = Seq.empty) extends LeafV2CommandExec with 
DataTypeErrorsBase {
+
+  override protected def run(): Seq[InternalRow] = {
+    // Extract CursorReference from the resolved cursor expression
+    val cursorRef = cursor.asInstanceOf[CursorReference]
+
+    val scriptingContext = 
CursorCommandUtils.getScriptingContext(cursorRef.definition.name)
+
+    // Get cursor definition from CursorReference (looked up during analysis)
+    val cursorDef = cursorRef.definition
+
+    // Get current cursor state and validate it's in Declared state
+    val currentState = scriptingContext.getCursorState(cursorRef).getOrElse(
+      throw new AnalysisException(
+        errorClass = "CURSOR_NOT_FOUND",
+        messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name))))
+
+    currentState match {
+      case CursorDeclared | CursorClosed => // Expected states - new or closed 
cursor
+      case _ =>
+        throw new AnalysisException(
+          errorClass = "CURSOR_ALREADY_OPEN",
+          messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name)))
+    }
+
+    // Parse and analyze the query from the stored SQL text
+    // For both parameterized and non-parameterized cursors, we parse at OPEN 
time
+    val analyzedQuery = if (args.nonEmpty) {
+      // Parameterized cursor: parse with bound parameters
+      executeParameterizedQuery(cursorDef.queryText, args)
+    } else {
+      // Non-parameterized cursor: parse without parameters
+      val df = session.asInstanceOf[org.apache.spark.sql.classic.SparkSession]
+        .sql(cursorDef.queryText)
+      df.queryExecution.analyzed
+    }
+
+    // Transition cursor state to Opened
+    scriptingContext.updateCursorState(
+      cursorRef.normalizedName,
+      cursorRef.scopeLabel,
+      CursorOpened(analyzedQuery))
+
+    Nil
+  }
+
+  /**
+   * Executes a parameterized query by parsing the SQL text with bound 
parameters.
+   * This uses the same parameter binding mechanism as EXECUTE IMMEDIATE.
+   *
+   * @param queryText The SQL query text with parameter markers (? or :name)
+   * @param args Parameter expressions to bind
+   * @return The analyzed logical plan with parameters bound
+   */
+  private def executeParameterizedQuery(
+      queryText: String,
+      args: Seq[Expression]): LogicalPlan = {
+    val (paramValues, paramNames) = buildUnifiedParameters(args)
+
+    // Use session.sql() with parameters (same as EXECUTE IMMEDIATE)
+    val df = session.asInstanceOf[org.apache.spark.sql.classic.SparkSession]
+      .sql(queryText, paramValues, paramNames)
+
+    df.queryExecution.analyzed
+  }
+
+  /**
+   * Builds parameter arrays for the session.sql() API.
+   * This mirrors the exact logic in EXECUTE IMMEDIATE to ensure identical 
behavior.

Review Comment:
   I think this example can be used to reflect on my previous thoughts. We have 
exactly the same ref spec for `EXECUTE IMMEDIATE` and `OPEN CURSOR`, with 
respect to `USING` clause. But our code is different. For cursors, we do 
`Alias` logic in `AstBuilder` and for execute immediate we do it during 
analysis (in `ResolveExecuteImmediate`). While this is a simple example, it 
paints the picture how this code can easily diverge even though the logic is 
completely the same (if I'm not misunderstanding anything).



##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala:
##########
@@ -115,7 +295,13 @@ class SqlScriptingExecutionFrame(
   def enterScope(
       label: String,
       triggerToExceptionHandlerMap: TriggerToExceptionHandlerMap): Unit = {
-    scopes.append(new SqlScriptingExecutionScope(label, 
triggerToExceptionHandlerMap))
+    // Normalize label for case-insensitive lookups
+    val normalizedLabel = if (SQLConf.get.caseSensitiveAnalysis) {
+      label
+    } else {
+      label.toLowerCase(Locale.ROOT)
+    }

Review Comment:
   this is not needed, labels are always lowercased in 
`ParserUtils#enterLabeledScope`. Not sure why we did it this way, maybe we need 
to fix it to account for `SQLConf.get.caseSensitiveAnalysis`.
   
   Anyways, cc @miland-db to double check this, I think you added the labels 
logic so you might recall why exactly we did it this way and whether it makes 
sense to take `SQLConf.get.caseSensitiveAnalysis` into account.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -6581,6 +6583,120 @@ class AstBuilder extends DataTypeAstBuilder
     )
   }
 
+  /**
+   * Create a [[DeclareCursor]] command wrapped in SingleStatement.
+   *
+   * For example:
+   * {{{
+   *   DECLARE cursor_name CURSOR FOR SELECT * FROM table;
+   * }}}
+   */
+  override def visitDeclareCursorStatement(
+      ctx: DeclareCursorStatementContext): LogicalPlan = withOrigin(ctx) {
+    if (!conf.getConf(SQLConf.SQL_SCRIPTING_CURSOR_ENABLED)) {
+      throw SqlScriptingErrors.cursorNotSupported(CurrentOrigin.get)
+    }
+
+    val cursorName = getIdentifierText(ctx.name)
+    // Extract original SQL text to preserve parameter markers
+    val queryText = getOriginalText(ctx.query())
+
+    val asensitive = if (ctx.INSENSITIVE() != null) false else true
+    SingleStatement(DeclareCursor(cursorName, queryText, asensitive))

Review Comment:
   same holds for other `visit*CursorStatement` functions



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.VariableReference
+import org.apache.spark.sql.catalyst.plans.logical.{FetchCursor, LogicalPlan, 
SingleStatement}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import 
org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError
+
+/**
+ * Resolves the target SQL variables in FetchCursor command.
+ * Variables can be either scripting local variables or session variables.
+ */
+class ResolveFetchCursor(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan]
+  with ColumnResolutionHelper {
+  // VariableResolution looks up both scripting local variables (via 
SqlScriptingContextManager)
+  // and session variables (via tempVariableManager), checking local variables 
first.
+  private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
+
+  /**
+   * Checks for duplicate variable names and throws an exception if found.
+   * Names are normalized when the variables are created.
+   * No need for case insensitive comparison here.
+   */
+  private def checkForDuplicateVariables(variables: Seq[VariableReference]): 
Unit = {

Review Comment:
   we are starting to duplicate a lot of code related to variables, and omit it 
where it makes sense.
   
   for example, this is the same as in `ResolveSetVariable` but we don't have 
the same check in `ExecuteImmediate` (or I might be missing it). anyways, all 
of those statements follow the "SELECT INTO" semantics, and I think they should 
behave consistently.
   
   maybe we should create a common trait for things related to "SELECT INTO" 
semantics? 
   
   we don't have to do it in this PR though, but if we don't do it then one of 
the outputs needs to be at least a work item to track it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to