ueshin commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566449042
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -88,14 +102,22 @@ class ArrowPythonRunner(
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
+ val startTime = System.nanoTime()
while (nextBatch.hasNext) {
arrowWriter.write(nextBatch.next())
}
arrowWriter.finish()
writer.writeBatch()
+
+ val deltaTime = System.nanoTime() - startTime
+ pythonSerializeTime += deltaTime
+ val rowCount = root.getRowCount
+ pythonNumBatchesSent += 1
+ pythonNumRowsSent += rowCount
arrowWriter.reset()
}
+ pythonDataSent += dataOut.size()
Review comment:
ditto.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -72,6 +83,9 @@ class ArrowPythonRunner(
}
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ val deltaTime = System.nanoTime()-startTime
Review comment:
nit: style: need spaces before and after `-`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
##########
@@ -72,6 +83,9 @@ class ArrowPythonRunner(
}
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ val deltaTime = System.nanoTime()-startTime
+ pythonCodeSerializeTime += deltaTime
+ pythonCodeSent += dataOut.size()
Review comment:
I guess we also need to use a delta size between before and after the
code is sent?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
##########
@@ -26,16 +26,22 @@ import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamReader
import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths}
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions,
SpecialLengths}
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch,
ColumnVector}
-/**
- * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the
logic from
- * Python (Arrow) to JVM (ColumnarBatch).
- */
-private[python] trait PythonArrowOutput { self: BasePythonRunner[_,
ColumnarBatch] =>
+// An abstract class implementing the logic from Python (Arrow) to JVM
(ColumnarBatch).
+private[python] abstract class PythonArrowOutput[IN](
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]],
+ pythonDataReceived: SQLMetric,
+ pythonExecTime: SQLMetric,
+ pythonNumRowsReceived: SQLMetric,
+ pythonNumBatchesReceived: SQLMetric)
Review comment:
nit: indent with 4 spaces here.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
##########
@@ -71,10 +77,19 @@ private[python] trait PythonArrowOutput { self:
BasePythonRunner[_, ColumnarBatc
}
try {
if (reader != null && batchLoaded) {
+ val bytesReadStart = reader.bytesRead()
+ val startTime = System.nanoTime()
batchLoaded = reader.loadNextBatch()
+ val deltaTime = System.nanoTime() - startTime
Review comment:
Same here? Seems like it's just calculating the batch loading time?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
##########
@@ -83,12 +97,19 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
val unpickledBatch = unpickle.loads(pickedResult)
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
+ pythonNumRowsReceived += 1
+ val startTime = System.nanoTime()
if (udfs.length == 1) {
// fast path for single UDF
mutableRow(0) = fromJava(result)
+ val deltaTime = System.nanoTime() - startTime
+ pythonExecTime += deltaTime
mutableRow
} else {
- fromJava(result).asInstanceOf[InternalRow]
+ val res = fromJava(result).asInstanceOf[InternalRow]
+ val deltaTime = System.nanoTime() - startTime
+ pythonExecTime += deltaTime
Review comment:
Is this really calculating the exec time? Seems like it's only
calculating the time for `fromJava(result)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]