miland-db commented on code in PR #53530:
URL: https://github.com/apache/spark/pull/53530#discussion_r2691066901
##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -852,4 +852,21 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
messageParameters = Map("parameterMarker" -> ctx.getText),
ctx = ctx)
}
+
+ /**
+ * Throws an exception when a cursor reference has more than one qualifier.
+ * Valid: cursor or label.cursor
+ * Invalid: a.b.cursor
+ *
+ * @param cursorName
+ * The fully qualified cursor name with multiple qualifiers
+ * @throws ParseException
+ * Always throws this exception
+ */
+ def cursorInvalidQualifierError(cursorName: String): Nothing = {
+ throw new ParseException(
+ errorClass = "CURSOR_REFERENCE_INVALID_QUALIFIER",
+ messageParameters = Map("cursorName" -> cursorName),
Review Comment:
Should we use `toSQLId(cursorName)` here?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala:
##########
@@ -97,6 +97,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveLateralColumnAliasReference" ::
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" ::
+ "org.apache.spark.sql.catalyst.analysis.ResolveFetchCursor" ::
Review Comment:
Why does `ResolveCursors` not need to be here?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -1927,6 +1927,55 @@ case class SetVariable(
copy(sourceQuery = newChild)
}
+/**
+ * The logical plan of the DECLARE CURSOR statement.
+ *
+ * The queryText is stored to support both parameterized and non-parameterized
cursors.
+ * The query is parsed and analyzed when the cursor is declared at execution
time.
+ *
+ * @param cursorName Name of the cursor
+ * @param queryText The original SQL text of the query (preserves parameter
markers)
+ * @param asensitive Whether the cursor is ASENSITIVE or INSENSITIVE
+ */
+case class DeclareCursor(
+ cursorName: String,
+ queryText: String,
+ asensitive: Boolean = true) extends LeafCommand
+
+/**
+ * The logical plan of the OPEN cursor command.
+ *
+ * @param cursor Cursor reference (UnresolvedCursor during parsing,
CursorReference after analysis)
+ * @param args Parameter expressions from USING clause
+ * @param paramNames Names for each parameter (empty string "" for positional
parameters)
+ */
+case class OpenCursor(
+ cursor: Expression,
+ args: Seq[Expression] = Seq.empty,
+ paramNames: Seq[String] = Seq.empty) extends LeafCommand {
+
+ assert(args.length == paramNames.length,
+ s"args.length (${args.length}) must equal paramNames.length
(${paramNames.length})")
+}
+
+/**
+ * The logical plan of the FETCH cursor command.
+ *
+ * @param cursor Cursor reference (UnresolvedCursor during parsing,
CursorReference after analysis)
+ * @param targetVariables Target variables to fetch into
+ */
+case class FetchCursor(
+ cursor: Expression,
+ targetVariables: Seq[Expression]) extends LeafCommand
+
+/**
+ * The logical plan of the CLOSE cursor command.
+ *
+ * @param cursor Cursor reference (UnresolvedCursor during parsing,
CursorReference after analysis)
+ */
+case class CloseCursor(
+ cursor: Expression) extends LeafCommand
Review Comment:
nit: This can be a single line.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{InternalRow, SqlScriptingContextManager}
+import org.apache.spark.sql.catalyst.catalog.VariableDefinition
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
Literal, VariableReference}
+import org.apache.spark.sql.classic.Dataset
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.{CursorFetching, CursorOpened}
+
+/**
+ * Physical plan node for fetching from cursors.
+ *
+ * Transitions cursor from Opened to Fetching state on first fetch (creating
result iterator),
+ * then fetches rows from the iterator on subsequent calls. Assigns fetched
values to target
+ * variables with ANSI store assignment rules.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ * @param targetVariables Variables to fetch into
+ */
+case class FetchCursorExec(
+ cursor: Expression,
+ targetVariables: Seq[VariableReference]) extends LeafV2CommandExec {
+
+ override protected def run(): Seq[InternalRow] = {
+ // Extract CursorReference from the resolved cursor expression
+ val cursorRef =
cursor.asInstanceOf[org.apache.spark.sql.catalyst.expressions.CursorReference]
+
+ val scriptingContext =
CursorCommandUtils.getScriptingContext(cursorRef.sql)
+ val variableManager =
SqlScriptingContextManager.get().get.getVariableManager
+
+ // Get current cursor state
+ val currentState = scriptingContext.currentFrame.getCursorState(
Review Comment:
Can we open cursor in a script body, and then fetch in exception handler
body?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CloseCursorExec.scala:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.CursorClosed
+
+/**
+ * Physical plan node for closing cursors.
+ *
+ * Transitions cursor from Opened or Fetching state to Closed state, releasing
resources.
+ * Closing an already closed cursor or a declared-but-not-opened cursor raises
an error.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ */
+case class CloseCursorExec(cursor: Expression) extends LeafV2CommandExec {
+
+ override protected def run(): Seq[InternalRow] = {
+ // Extract CursorReference from the resolved cursor expression
+ val cursorRef =
cursor.asInstanceOf[org.apache.spark.sql.catalyst.expressions.CursorReference]
Review Comment:
nit: Can we import this properly and use only `CursorReference`?
Also there are multiple places in code where this can be improved.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CloseCursorExec.scala:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.scripting.CursorClosed
+
+/**
+ * Physical plan node for closing cursors.
+ *
+ * Transitions cursor from Opened or Fetching state to Closed state, releasing
resources.
+ * Closing an already closed cursor or a declared-but-not-opened cursor raises
an error.
+ *
+ * @param cursor CursorReference resolved during analysis phase
+ */
+case class CloseCursorExec(cursor: Expression) extends LeafV2CommandExec {
+
+ override protected def run(): Seq[InternalRow] = {
+ // Extract CursorReference from the resolved cursor expression
+ val cursorRef =
cursor.asInstanceOf[org.apache.spark.sql.catalyst.expressions.CursorReference]
+
+ val scriptingContext =
CursorCommandUtils.getScriptingContext(cursorRef.sql)
+
+ // Get current cursor state and validate it exists
+ val currentState = scriptingContext.currentFrame.getCursorState(
Review Comment:
Similar question to the one in Fetch Cursor, is there a way to open cursor
in a script body, use it, and then close it in exception handler?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]