sahnib commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1554103768


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -312,187 +312,109 @@ class ValueStateSuite extends StateVariableSuiteBase {
     }
   }
 
-  Seq(TTLMode.ProcessingTimeTTL(), TTLMode.EventTimeTTL()).foreach { ttlMode =>
-    test(s"test Value state TTL for $ttlMode") {
-      tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-        val store = provider.getStore(0)
-        val timestampMs = 10
-        val handle = createHandleForTtlMode(ttlMode, store, timestampMs)
-
-        val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-          Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-        testState.update("v1")
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        var ttlValue = testState.getTTLValue()
-        assert(ttlValue.isEmpty)
-        var ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.isEmpty)
-
-        testState.clear()
-        assert(!testState.exists())
-        assert(testState.get() === null)
-
-        val ttlExpirationMs = timestampMs + 60000
-
-        if (ttlMode == TTLMode.ProcessingTimeTTL()) {
-          testState.update("v1", Duration.ofMinutes(1))
-        } else {
-          testState.update("v1", ttlExpirationMs)
-        }
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        ttlValue = testState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // increment batchProcessingTime, or watermark and ensure expired 
value is not returned
-        val nextBatchHandle = createHandleForTtlMode(ttlMode, store, 
ttlExpirationMs)
-
-        val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
-          .getValueState[String]("testState", Encoders.STRING)
-          .asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-
-        // ensure get does not return the expired value
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        // ttl value should still exist in state
-        ttlValue = nextBatchTestState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // getWithoutTTL should still return the expired value
-        assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-      }
-    }
-  }
 
-  test("test TTL duration throws error for event time") {
+  test(s"test Value state TTL") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
-      val eventTimeWatermarkMs = 10
+      val timestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.EventTimeTTL(), TimeoutMode.NoTimeouts(),
-        eventTimeWatermarkMs = Some(eventTimeWatermarkMs))
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(timestampMs))
 
+      val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
       val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+        Encoders.STRING, ttlConfig).asInstanceOf[ValueStateImplWithTTL[String]]
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      testState.update("v1")

Review Comment:
   it should not be. I missed updating this test, somehow local run was not 
initially showing this as failed. Once I cleaned everything and rebuilt, 
failure showed up. Fixed the testcase. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to