HeartSaVioR commented on code in PR #46825:
URL: https://github.com/apache/spark/pull/46825#discussion_r1631685631
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
Review Comment:
Same.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala:
##########
@@ -38,7 +38,7 @@ class ListStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
TimeMode.ProcessingTime())
Review Comment:
nit: same, wanted to know whether this is just a convenience or required.
##########
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:
##########
@@ -206,7 +206,7 @@ public void testInitialStateForTransformWithState() {
Dataset<String> transformWithStateMapped = grouped.transformWithState(
new TestStatefulProcessorWithInitialState(),
- TimeMode.None(),
+ TimeMode.ProcessingTime(),
Review Comment:
nit: just to be fully sure, either ProcessingTime or EventTime works, do I
understand correctly? If either one only works for replacement of None, should
be better to be documented.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -632,9 +632,9 @@ class MicroBatchExecution(
// Check whether next batch should be constructed
val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
// need to check the execution plan of the previous batch
- execCtx.previousContext.map { plan =>
+ execCtx.previousContext.exists { plan =>
Review Comment:
nit: again, separate minor PR for unrelated change.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala:
##########
Review Comment:
I wonder why the test code change is required given the code change is just
to remove TimeMode None. Do we fix some test flakiness as well here, or what is
the rationale of manual clock?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala:
##########
@@ -27,8 +27,7 @@ import org.apache.spark.sql.streaming.{ExpiredTimerInfo,
TimeMode}
*/
class ExpiredTimerInfoImpl(
isValid: Boolean,
- expiryTimeInMsOpt: Option[Long] = None,
- timeMode: TimeMode = TimeMode.None()) extends ExpiredTimerInfo {
Review Comment:
Do we assume we don't need to provide either it was from event time semantic
vs processing time semantic? What was the rationale to add this and why this
could be removed while we just remove out None?
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
Review Comment:
Same question.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]