HyukjinKwon commented on code in PR #44305:
URL: https://github.com/apache/spark/pull/44305#discussion_r1423230523


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -55,84 +55,102 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
     val inputRDD = child.execute().map(_.copy())
 
     inputRDD.mapPartitions { iter =>
-      val context = TaskContext.get()
-
-      // The queue used to buffer input rows so we can drain it to
-      // combine input with output from Python.
-      val queue = HybridRowQueue(context.taskMemoryManager(),
-        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-      context.addTaskCompletionListener[Unit] { ctx =>
-        queue.close()
-      }
+      EvalPythonUDTFExec.execute(
+        iter, evaluate, udtf, output.length, child.output, 
requiredChildOutput, output)
+    }
+  }
+}
 
-      // flatten all the arguments
-      val allInputs = new ArrayBuffer[Expression]
-      val dataTypes = new ArrayBuffer[DataType]
-      val argMetas = udtf.children.map { e =>
-        val (key, value) = e match {
-          case NamedArgumentExpression(key, value) =>
-            (Some(key), value)
-          case _ =>
-            (None, e)
-        }
-        if (allInputs.exists(_.semanticEquals(value))) {
-          ArgumentMetadata(allInputs.indexWhere(_.semanticEquals(value)), key)
-        } else {
-          allInputs += value
-          dataTypes += value.dataType
-          ArgumentMetadata(allInputs.length - 1, key)
-        }
-      }.toArray
-      val projection = MutableProjection.create(allInputs.toSeq, child.output)
-      projection.initialize(context.partitionId())
-      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
-        StructField(s"_$i", dt)
-      }.toArray)
-
-      // Add rows to the queue to join later with the result.
-      // Also keep track of the number rows added to the queue.
-      // This is needed to process extra output rows from the `terminate()` 
call of the UDTF.
-      var count = 0L
-      val projectedRowIter = iter.map { inputRow =>
-        queue.add(inputRow.asInstanceOf[UnsafeRow])
-        count += 1
-        projection(inputRow)
+object EvalPythonUDTFExec {

Review Comment:
   Just moving the code around. It's same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to