Nishanth28 commented on code in PR #52931:
URL: https://github.com/apache/spark/pull/52931#discussion_r2536492635


##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -1015,7 +1036,10 @@ private[spark] class PythonRunner(
         try {
           stream.readInt() match {
             case length if length >= 0 =>
-              PythonWorkerUtils.readBytes(length, stream)
+              val data = PythonWorkerUtils.readBytes(length, stream)
+              recordsProcessed += 1
+              totalDataReceived += length

Review Comment:
   @ueshin PythonArrowOutput already tracks data via pythonMetrics 
   ("pythonDataReceived" at line 175). However, it doesn't increment 
batchesProcessed/totalDataReceived, so the timing logs would show  "Batches: 0, 
Data: 0 B".
   
   I can add batchesProcessed tracking in PythonArrowOutput.read() (line 96), 
   though Arrow's data flow is different (batches loaded via 
ArrowOutputProcessor). 
   
   Should I add this, or is the existing pythonMetrics tracking sufficient for 
   Arrow runners? Let me know your preference!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to