This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7606ad32d61 [SPARK-44484][SS] Add batchDuration to StreamingQueryProgress json method 7606ad32d61 is described below commit 7606ad32d6163d0219bec62176d185815de4eebc Author: Wei Liu <wei....@databricks.com> AuthorDate: Fri Jul 21 13:49:03 2023 +0900 [SPARK-44484][SS] Add batchDuration to StreamingQueryProgress json method ### What changes were proposed in this pull request? Add the missing field batchDuration to StreamingQueryProgress json method. Also modify tests accordingly ### Why are the changes needed? Add a missing field ### Does this PR introduce _any_ user-facing change? Probably yes - in their call to `query.lastProgress` or `query.recentProgress` and inside listener this new field will show up ### How was this patch tested? Existing unit tests Closes #42077 from WweiL/SPARK-44484-missing-json-field-progress. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 + .../org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala | 1 + sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 + .../spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 2 ++ 4 files changed, 5 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 123b3306f2a..8370a336abb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -182,6 +182,7 @@ class StreamingQueryProgress private[spark] ( ("name" -> JString(name)) ~ ("timestamp" -> JString(timestamp)) ~ ("batchId" -> JInt(batchId)) ~ + ("batchDuration" -> JInt(batchDuration)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala index a6a44c1bd71..2911e4e016e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala @@ -161,6 +161,7 @@ class StreamingQueryProgressSuite extends ConnectFunSuite { | "name" : "myName", | "timestamp" : "2016-12-05T20:54:20.827Z", | "batchId" : 2, + | "batchDuration" : 0, | "numInputRows" : 1467, | "inputRowsPerSecond" : 22.0, | "durationMs" : { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 5cce52b9c0f..385157a09c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -166,6 +166,7 @@ class StreamingQueryProgress private[spark]( ("name" -> JString(name)) ~ ("timestamp" -> JString(timestamp)) ~ ("batchId" -> JInt(batchId)) ~ + ("batchDuration" -> JInt(batchDuration)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index d016b334627..1b6005257c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -46,6 +46,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "name" : "myName", | "timestamp" : "2016-12-05T20:54:20.827Z", | "batchId" : 2, + | "batchDuration" : 0, | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { @@ -112,6 +113,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "name" : null, | "timestamp" : "2016-12-05T20:54:20.827Z", | "batchId" : 2, + | "batchDuration" : 0, | "numInputRows" : 678, | "durationMs" : { | "total" : 0 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org