ueshin commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566449042



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -88,14 +102,22 @@ class ArrowPythonRunner(
           while (inputIterator.hasNext) {
             val nextBatch = inputIterator.next()
 
+            val startTime = System.nanoTime()
             while (nextBatch.hasNext) {
               arrowWriter.write(nextBatch.next())
             }
 
             arrowWriter.finish()
             writer.writeBatch()
+
+            val deltaTime = System.nanoTime() - startTime
+            pythonSerializeTime += deltaTime
+            val rowCount = root.getRowCount
+            pythonNumBatchesSent += 1
+            pythonNumRowsSent += rowCount
             arrowWriter.reset()
           }
+          pythonDataSent += dataOut.size()

Review comment:
       ditto.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -72,6 +83,9 @@ class ArrowPythonRunner(
         }
 
         PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+        val deltaTime = System.nanoTime()-startTime

Review comment:
       nit: style: need spaces before and after `-`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -72,6 +83,9 @@ class ArrowPythonRunner(
         }
 
         PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+        val deltaTime = System.nanoTime()-startTime
+        pythonCodeSerializeTime += deltaTime
+        pythonCodeSent += dataOut.size()

Review comment:
       I guess we also need to use a delta size between before and after the 
code is sent?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
##########
@@ -26,16 +26,22 @@ import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.ipc.ArrowStreamReader
 
 import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths}
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, 
SpecialLengths}
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, 
ColumnVector}
 
-/**
- * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the 
logic from
- * Python (Arrow) to JVM (ColumnarBatch).
- */
-private[python] trait PythonArrowOutput { self: BasePythonRunner[_, 
ColumnarBatch] =>
+// An abstract class implementing the logic from Python (Arrow) to JVM 
(ColumnarBatch).
+private[python] abstract class PythonArrowOutput[IN](
+                           funcs: Seq[ChainedPythonFunctions],
+                           evalType: Int,
+                           argOffsets: Array[Array[Int]],
+                           pythonDataReceived: SQLMetric,
+                           pythonExecTime: SQLMetric,
+                           pythonNumRowsReceived: SQLMetric,
+                           pythonNumBatchesReceived: SQLMetric)

Review comment:
       nit: indent with 4 spaces here.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
##########
@@ -71,10 +77,19 @@ private[python] trait PythonArrowOutput { self: 
BasePythonRunner[_, ColumnarBatc
         }
         try {
           if (reader != null && batchLoaded) {
+            val bytesReadStart = reader.bytesRead()
+            val startTime = System.nanoTime()
             batchLoaded = reader.loadNextBatch()
+            val deltaTime = System.nanoTime() - startTime

Review comment:
       Same here? Seems like it's just calculating the batch loading time?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
##########
@@ -83,12 +97,19 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
       val unpickledBatch = unpickle.loads(pickedResult)
       unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
     }.map { result =>
+      pythonNumRowsReceived += 1
+      val startTime = System.nanoTime()
       if (udfs.length == 1) {
         // fast path for single UDF
         mutableRow(0) = fromJava(result)
+        val deltaTime = System.nanoTime() - startTime
+        pythonExecTime += deltaTime
         mutableRow
       } else {
-        fromJava(result).asInstanceOf[InternalRow]
+        val res = fromJava(result).asInstanceOf[InternalRow]
+        val deltaTime = System.nanoTime() - startTime
+        pythonExecTime += deltaTime

Review comment:
       Is this really calculating the exec time? Seems like it's only 
calculating the time for `fromJava(result)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to