tdas commented on a change in pull request #33093:
URL: https://github.com/apache/spark/pull/33093#discussion_r662653872
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
##########
@@ -1278,9 +1276,21 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
val flatMapGroupsWithStateFunc =
(key: String, values: Iterator[String], state: GroupState[RunningCount])
=> {
val valList = values.toSeq
+ if (valList.isEmpty) {
+ // When the function is called on just the initial state make sure the
other fields
+ // are set correctly
+ assert(state.exists)
+ assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 }
+ assertCannotGetWatermark { state.getCurrentWatermarkMs() }
+ assert(!state.hasTimedOut)
+ }
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
- state.update(new RunningCount(count))
- Iterator((key, valList, state.get.count.toString))
+ // We need to check if not explicitly calling update will still save the
init state or not
+ if (valList.nonEmpty || state.getOption.map(_.count).getOrElse(0L) !=
2L) {
+ // this is not reached when valList is empty and the state count is 2
+ state.update(new RunningCount(count))
Review comment:
rather than doing this complicated logic of no updating when some
magical set of conditions are met .. isnt it simpler to have
`if (!key.contains("NoUpdate")) state.update(...)`
and then pass a key name `keyOnlyInStateButNoUpate` or
`keyInStateAndDataButNoUpate`??
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]