dusantism-db commented on code in PR #48794:
URL: https://github.com/apache/spark/pull/48794#discussion_r1834330570


##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala:
##########
@@ -649,3 +652,145 @@ class LoopStatementExec(
     body.reset()
   }
 }
+
+/**
+ * Executable node for ForStatement.
+ * @param query Executable node for the query.
+ * @param variableName Name of variable used for accessing current row during 
iteration.
+ * @param body Executable node for the body. If variableName is not None, will 
have DropVariable
+ *             as the last statement.
+ * @param label Label set to ForStatement by user or None otherwise.
+ * @param session Spark session that SQL script is executed within.
+ */
+class ForStatementExec(
+    query: SingleStatementExec,
+    variableName: Option[String],
+    body: CompoundBodyExec,
+    label: Option[String],
+    session: SparkSession) extends NonLeafStatementExec {
+
+  private object ForState extends Enumeration {
+    val VariableDeclaration, VariableAssignment, Body = Value
+  }
+  private var state = ForState.VariableDeclaration
+  private var currRow = 0
+  private var currVariable: Expression = null
+
+  private var queryResult: Array[Row] = null
+  private var isResultCacheValid = false
+  private def cachedQueryResult(): Array[Row] = {
+    if (!isResultCacheValid) {
+      query.isExecuted = true
+      queryResult = Dataset.ofRows(session, query.parsedPlan).collect()
+      isResultCacheValid = true
+    }
+    queryResult
+  }
+
+  /**
+   * Loop can be interrupted by LeaveStatementExec
+   */
+  private var interrupted: Boolean = false
+
+  private lazy val treeIterator: Iterator[CompoundStatementExec] =
+    new Iterator[CompoundStatementExec] {
+      override def hasNext: Boolean =
+        !interrupted && cachedQueryResult().length > 0 && currRow < 
cachedQueryResult().length
+
+      override def next(): CompoundStatementExec = state match {
+        case ForState.VariableDeclaration =>
+          // when there is no for variable, skip var declaration and iterate 
only the body
+          if (variableName.isEmpty) {
+            state = ForState.Body
+            body.reset()
+            return next()
+          }
+          currVariable = 
createExpressionFromValue(cachedQueryResult()(currRow))
+          state = ForState.VariableAssignment
+          createDeclareVarExec(variableName.get, currVariable)
+
+        case ForState.VariableAssignment =>
+          state = ForState.Body
+          body.reset()
+          createSetVarExec(variableName.get, currVariable)
+
+        case ForState.Body =>
+          val retStmt = body.getTreeIterator.next()
+
+          // Handle LEAVE or ITERATE statement if it has been encountered.
+          retStmt match {
+            case leaveStatementExec: LeaveStatementExec if 
!leaveStatementExec.hasBeenMatched =>
+              if (label.contains(leaveStatementExec.label)) {
+                leaveStatementExec.hasBeenMatched = true
+              }
+              interrupted = true
+              return retStmt
+            case iterStatementExec: IterateStatementExec if 
!iterStatementExec.hasBeenMatched =>
+              if (label.contains(iterStatementExec.label)) {
+                iterStatementExec.hasBeenMatched = true
+              }
+              currRow += 1
+              state = ForState.VariableDeclaration
+              return retStmt
+            case _ =>
+          }
+
+          if (!body.getTreeIterator.hasNext) {
+            currRow += 1
+            state = ForState.VariableDeclaration
+          }
+          retStmt
+      }
+    }
+
+  /**
+   * Creates a Catalyst expression from Scala value.
+   */
+  private def createExpressionFromValue(value: Any): Expression = value match {
+    case m: Map[_, _] =>
+      // arguments of CreateMap are in the format: (key1, val1, key2, val2, 
...)
+      val mapArgs = m.keys.toSeq.flatMap { key =>
+        Seq(createExpressionFromValue(key), createExpressionFromValue(m(key)))
+      }
+      CreateMap(mapArgs, false)
+    case s: GenericRowWithSchema =>
+    // struct types match this case
+    // arguments of CreateNamedStruct are in the format: (name1, val1, name2, 
val2, ...)
+    val namedStructArgs = s.schema.names.toSeq.flatMap { colName =>
+        val valueExpression = createExpressionFromValue(s.getAs(colName))
+        Seq(Literal(colName), valueExpression)
+      }
+      CreateNamedStruct(namedStructArgs)
+    case _ => Literal(value)

Review Comment:
   When Map or Struct are in the result set of the query, we can't use 
Literal(value) to convert them to expressions because Literals don't support 
them. So for example for Map we recursively convert both keys and values to 
expressions first, and then create a map expression using CreateMap. The 
process is similar for structs.
   
   The way i checked is i went through all the spark data types, and for each 
checked in code of Literal whether it's supported. I only found these two which 
are not, however I agree we can't be completely sure, and new types will be 
added to Spark in the future which Literals may or may not support. Probably I 
should add an error message for currently unsupported type, in case it comes 
up. Does that make sense to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to