bogao007 commented on code in PR #48373:
URL: https://github.com/apache/spark/pull/48373#discussion_r1819943103


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -503,7 +503,11 @@ def transformWithStateUDF(
             statefulProcessorApiClient.set_implicit_key(key)
             result = statefulProcessor.handleInputRows(key, inputRows)
 
-            return result
+            try:
+                yield result
+            finally:
+                statefulProcessor.close()

Review Comment:
   I just realized an issue that this is actually being called after processing 
each grouping key instead of finishing processing all keys for a microbatch. 
I'll need to revisit this to see if there's a good way to handle this (I cannot 
think about a good way to detect `if the current key is the last key to 
process` right now), if it's not a quick fix, we can probably exclude it for 
now and have a followup PR fixing it. cc @HeartSaVioR 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -106,6 +106,30 @@ case class TransformWithStateInPandasExec(
     List.empty
   }
 
+    // operator specific metrics
+  override def customStatefulOperatorMetrics: 
Seq[StatefulOperatorCustomMetric] = {

Review Comment:
   Added in python test to verify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to