davidm-db commented on code in PR #47423:
URL: https://github.com/apache/spark/pull/47423#discussion_r1713935941
##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala:
##########
@@ -127,20 +148,112 @@ class SingleStatementExec(
origin.sqlText.get.substring(origin.startIndex.get, origin.stopIndex.get +
1)
}
- override def reset(): Unit = isExecuted = false
+ override def reset(): Unit = {
+ raisedError = false
+ errorState = None
+ error = None
+ rethrow = None
+ result = None // Should we do this?
+ }
+
+ override def execute(session: SparkSession): Unit = {
+ try {
+ val rows = Some(Dataset.ofRows(session, parsedPlan).collect())
+ if (shouldCollectResult) {
+ result = rows
+ }
+ } catch {
+ case e: SparkThrowable =>
+ raisedError = true
+ errorState = Some(e.getSqlState)
+ error = Some(e)
+ e match {
+ case throwable: Throwable =>
+ rethrow = Some(throwable)
+ case _ =>
+ }
+ case throwable: Throwable =>
+ raisedError = true
+ errorState = Some("SQLEXCEPTION")
+ rethrow = Some(throwable)
+ }
+ }
}
/**
- * Abstract class for all statements that contain nested statements.
- * Implements recursive iterator logic over all child execution nodes.
- * @param collection
- * Collection of child execution nodes.
+ * Executable node for CompoundBody.
+ * @param statements
+ * Executable nodes for nested statements within the CompoundBody.
+ * @param session
+ * Spark session.
*/
-abstract class CompoundNestedStatementIteratorExec(collection:
Seq[CompoundStatementExec])
+class CompoundBodyExec(
+ label: Option[String] = None,
+ statements: Seq[CompoundStatementExec],
+ conditionHandlerMap: mutable.HashMap[String, ErrorHandlerExec] =
mutable.HashMap(),
+ session: SparkSession)
extends NonLeafStatementExec {
- private var localIterator = collection.iterator
- private var curr = if (localIterator.hasNext) Some(localIterator.next())
else None
+ private def getHandler(condition: String): Option[ErrorHandlerExec] = {
+ conditionHandlerMap.get(condition)
+ .orElse(conditionHandlerMap.get("NOT FOUND") match {
+ case Some(handler) if condition.startsWith("02") => Some(handler)
+ case _ => None
+ })
+ .orElse(conditionHandlerMap.get("SQLEXCEPTION"))
+ }
+
+ /**
+ * Handle error raised during the execution of the statement.
+ * @param statement statement that possibly raised the error
+ * @return pass through the statement
+ */
+ private def handleError(statement: LeafStatementExec): LeafStatementExec = {
+ if (statement.raisedError) {
+ getHandler(statement.errorState.get).foreach { handler =>
+ statement.reset() // Clear all flags and result
+ handler.reset()
+ returnHere = curr
+ curr = Some(handler.getHandlerBody)
+ }
+ }
+ statement
+ }
+
+ /**
+ * Check if the leave statement was used, if it is not used stop iterating
surrounding
+ * [[CompoundBodyExec]] and move iterator forward. If the label of the block
matches the label of
+ * the leave statement, mark the leave statement as used.
+ * @param leave leave statement
+ * @return pass through the leave statement
+ */
+ private def handleLeave(leave: LeaveStatementExec): LeaveStatementExec = {
+ if (!leave.used) {
+ // Hard stop the iteration of the current begin/end block
+ stopIteration = true
+ // If label of the block matches the label of the leave statement,
+ // mark the leave statement as used
+ if (label.getOrElse("").equals(leave.getLabel)) {
+ leave.used = true
+ }
+ }
+ curr = if (localIterator.hasNext) Some(localIterator.next()) else None
+ leave
+ }
+
+ private var localIterator: Iterator[CompoundStatementExec] =
statements.iterator
+ private var curr: Option[CompoundStatementExec] =
+ if (localIterator.hasNext) Some(localIterator.next()) else None
+ private var stopIteration: Boolean = false // hard stop iteration flag
Review Comment:
nit: add small comments here that explain what `stopIteration` and
`returnHere` are used for
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]