anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1548593408


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -303,6 +311,244 @@ class ValueStateSuite extends StateVariableSuiteBase {
       assert(testState.get() === null)
     }
   }
+
+  test("test Value state TTL for processing time") {
+    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
+      val store = provider.getStore(0)
+      val batchTimestampMs = 10
+      val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(batchTimestampMs))
+
+      val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
+        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      testState.update("v1")
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
+
+      var ttlValue = testState.getTTLValue()
+      assert(ttlValue.isEmpty)
+      var ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.isEmpty)
+
+      testState.clear()
+      assert(!testState.exists())
+      assert(testState.get() === null)
+
+      testState.update("v1", Duration.ofMinutes(1))
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
+
+      val expectedTtlExpirationMs = batchTimestampMs + 60000
+      ttlValue = testState.getTTLValue()
+      assert(ttlValue.isDefined)
+      assert(ttlValue.get === expectedTtlExpirationMs)
+      ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.hasNext)
+      assert(ttlStateValueIterator.next() === expectedTtlExpirationMs)
+      assert(ttlStateValueIterator.isEmpty)
+
+      // increment batchProcessingTime and ensure expired value is not returned
+      val nextBatchHandle = new StatefulProcessorHandleImpl(store, 
UUID.randomUUID(),
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(expectedTtlExpirationMs))
+
+      val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
+        .getValueState[String]("testState", Encoders.STRING)
+        .asInstanceOf[ValueStateImplWithTTL[String]]
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+
+      // ensure get does not return the expired value
+      assert(!nextBatchTestState.exists())
+      assert(nextBatchTestState.get() === null)
+
+      // ttl value should still exist in state
+      ttlValue = nextBatchTestState.getTTLValue()
+      assert(ttlValue.isDefined)
+      assert(ttlValue.get === expectedTtlExpirationMs)
+      ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
+      assert(ttlStateValueIterator.hasNext)
+      assert(ttlStateValueIterator.next() === expectedTtlExpirationMs)
+      assert(ttlStateValueIterator.isEmpty)
+
+      // getWithoutTTL should still return the expired value
+      assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
+
+      nextBatchTestState.clear()
+      assert(!nextBatchTestState.exists())
+      assert(nextBatchTestState.get() === null)
+
+      nextBatchTestState.clear()
+      assert(!nextBatchTestState.exists())
+      assert(nextBatchTestState.get() === null)
+    }
+  }
+
+  test("test Value state TTL for event time") {

Review Comment:
   Could we combine with test above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to