HeartSaVioR commented on code in PR #48373:
URL: https://github.com/apache/spark/pull/48373#discussion_r1827248795
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -194,6 +195,10 @@ def
test_transform_with_state_in_pandas_query_restarts(self):
q.awaitTermination(10)
self.assertTrue(q.exception() is None)
+ # Verify custom metrics. We created 2 value states in this test case
and deleted 1 of them.
Review Comment:
ditto, let's move this out.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -106,6 +106,30 @@ case class TransformWithStateInPandasExec(
List.empty
}
+ // operator specific metrics
Review Comment:
Same, shall we move the change for metrics (and test) out to separate JIRA
ticket and corresponding PR?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -154,6 +178,7 @@ case class TransformWithStateInPandasExec(
// by the upstream (consumer) operators in addition to the
processing in this operator.
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime -
updatesStartTimeNs)
commitTimeMs += timeTakenMs {
+ processorHandle.doTtlCleanup()
Review Comment:
Is this a bugfix for existing bug? If then please file a new JIRA ticket and
submit a new PR. Let's not mix up with different things.
##########
sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java:
##########
Review Comment:
Friendly reminder: we can remove this now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]