HeartSaVioR commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1553078246


##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -41,6 +41,12 @@ private[sql] trait StatefulProcessorHandle extends 
Serializable {
    */
   def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T]
 
+  def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      tTLConfig: TTLConfig): ValueState[T]

Review Comment:
   nit: ttlConfig. I get the reason of doing this as tTL but it's less weird to 
just use ttl.
   nit2: 2 empty lines (while we are here)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -89,14 +124,29 @@ class StateTypesEncoder[GK, V](
     val value = rowToObjDeserializer.apply(reusedValRow)
     value
   }
+
+  /**
+   * Decode the ttl information out of Value row. If the ttl has
+   * not been set (-1L specifies no user defined value), the API will
+   * return None.
+   */
+  def decodeTtlExpirationMs(row: UnsafeRow): Option[Long] = {
+    val expirationMs = row.getLong(1)

Review Comment:
   If you feel like there are many rows to be matched, I'm OK with removing 
this. Actually I see the same necessity for `encodeValue(value: V)` and 
`encodeValue(value: V, expirationMs: Long)`. They depend on hasTtl flag to work 
properly. I'm fine to defer the caller to do the right thing.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -312,187 +312,109 @@ class ValueStateSuite extends StateVariableSuiteBase {
     }
   }
 
-  Seq(TTLMode.ProcessingTimeTTL(), TTLMode.EventTimeTTL()).foreach { ttlMode =>
-    test(s"test Value state TTL for $ttlMode") {
-      tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-        val store = provider.getStore(0)
-        val timestampMs = 10
-        val handle = createHandleForTtlMode(ttlMode, store, timestampMs)
-
-        val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-          Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-        testState.update("v1")
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        var ttlValue = testState.getTTLValue()
-        assert(ttlValue.isEmpty)
-        var ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.isEmpty)
-
-        testState.clear()
-        assert(!testState.exists())
-        assert(testState.get() === null)
-
-        val ttlExpirationMs = timestampMs + 60000
-
-        if (ttlMode == TTLMode.ProcessingTimeTTL()) {
-          testState.update("v1", Duration.ofMinutes(1))
-        } else {
-          testState.update("v1", ttlExpirationMs)
-        }
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        ttlValue = testState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // increment batchProcessingTime, or watermark and ensure expired 
value is not returned
-        val nextBatchHandle = createHandleForTtlMode(ttlMode, store, 
ttlExpirationMs)
-
-        val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
-          .getValueState[String]("testState", Encoders.STRING)
-          .asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-
-        // ensure get does not return the expired value
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        // ttl value should still exist in state
-        ttlValue = nextBatchTestState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // getWithoutTTL should still return the expired value
-        assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-      }
-    }
-  }
 
-  test("test TTL duration throws error for event time") {
+  test(s"test Value state TTL") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
-      val eventTimeWatermarkMs = 10
+      val timestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.EventTimeTTL(), TimeoutMode.NoTimeouts(),
-        eventTimeWatermarkMs = Some(eventTimeWatermarkMs))
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(timestampMs))
 
+      val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
       val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+        Encoders.STRING, ttlConfig).asInstanceOf[ValueStateImplWithTTL[String]]
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      testState.update("v1")

Review Comment:
   Wait, why this is different from the below update after clearing state? I'd 
expect TTL to be set when this update has happened.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -227,19 +218,17 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     }
   }
 
-  Seq("ProcessingTime", "EventTime").foreach { ttlMode =>
-    test(s"ttl States are populated for ttlMode=$ttlMode") {
-      tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-        val store = provider.getStore(0)
-        val handle = new StatefulProcessorHandleImpl(store,
-          UUID.randomUUID(), keyExprEncoder, getTtlMode(ttlMode), 
TimeoutMode.NoTimeouts(),
-          batchTimestampMs = Some(10), eventTimeWatermarkMs = Some(100))
+  test(s"ttl States are populated for ttlMode=ProcessingTime") {
+    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store,
+        UUID.randomUUID(), keyExprEncoder, TTLMode.ProcessingTimeTTL(), 
TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(10))
 
-        val valueState = handle.getValueState("testState", Encoders.STRING)
+      val valueState = handle.getValueState("testState", Encoders.STRING)

Review Comment:
   Just wanted to confirm - here we don't add configuration of TTL for this 
state. Will this state still be a part of TTL states?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -312,187 +312,109 @@ class ValueStateSuite extends StateVariableSuiteBase {
     }
   }
 
-  Seq(TTLMode.ProcessingTimeTTL(), TTLMode.EventTimeTTL()).foreach { ttlMode =>
-    test(s"test Value state TTL for $ttlMode") {
-      tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-        val store = provider.getStore(0)
-        val timestampMs = 10
-        val handle = createHandleForTtlMode(ttlMode, store, timestampMs)
-
-        val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-          Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-        testState.update("v1")
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        var ttlValue = testState.getTTLValue()
-        assert(ttlValue.isEmpty)
-        var ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.isEmpty)
-
-        testState.clear()
-        assert(!testState.exists())
-        assert(testState.get() === null)
-
-        val ttlExpirationMs = timestampMs + 60000
-
-        if (ttlMode == TTLMode.ProcessingTimeTTL()) {
-          testState.update("v1", Duration.ofMinutes(1))
-        } else {
-          testState.update("v1", ttlExpirationMs)
-        }
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        ttlValue = testState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // increment batchProcessingTime, or watermark and ensure expired 
value is not returned
-        val nextBatchHandle = createHandleForTtlMode(ttlMode, store, 
ttlExpirationMs)
-
-        val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
-          .getValueState[String]("testState", Encoders.STRING)
-          .asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-
-        // ensure get does not return the expired value
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        // ttl value should still exist in state
-        ttlValue = nextBatchTestState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // getWithoutTTL should still return the expired value
-        assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-      }
-    }
-  }
 
-  test("test TTL duration throws error for event time") {
+  test(s"test Value state TTL") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
-      val eventTimeWatermarkMs = 10
+      val timestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.EventTimeTTL(), TimeoutMode.NoTimeouts(),
-        eventTimeWatermarkMs = Some(eventTimeWatermarkMs))
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(timestampMs))
 
+      val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
       val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+        Encoders.STRING, ttlConfig).asInstanceOf[ValueStateImplWithTTL[String]]
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      testState.update("v1")
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
 
-      val ex = intercept[SparkUnsupportedOperationException] {
-        testState.update("v1", Duration.ofMinutes(1))
-      }
+      var ttlValue = testState.getTTLValue()
+      assert(ttlValue.isEmpty)
+      var ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.isEmpty)
 
-      checkError(
-        ex,
-        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_USE_TTL_DURATION_IN_EVENT_TIME_TTL_MODE",
-        parameters = Map(
-          "operationType" -> "update",
-          "stateName" -> "testState"
-        ),
-        matchPVals = true
-      )
-    }
-  }
+      testState.clear()
+      assert(!testState.exists())
+      assert(testState.get() === null)
 
-  test("test negative TTL duration throws error") {
-    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-      val store = provider.getStore(0)
-      val batchTimestampMs = 10
-      val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
+      val ttlExpirationMs = timestampMs + 60000
+
+      testState.update("v1")
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
+
+      ttlValue = testState.getTTLValue()
+      assert(ttlValue.isDefined)
+      assert(ttlValue.get === ttlExpirationMs)
+      ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.hasNext)
+      assert(ttlStateValueIterator.next() === ttlExpirationMs)
+      assert(ttlStateValueIterator.isEmpty)
+
+      // increment batchProcessingTime, or watermark and ensure expired value 
is not returned
+      val nextBatchHandle = new StatefulProcessorHandleImpl(store, 
UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
         TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
-        batchTimestampMs = Some(batchTimestampMs))
+        batchTimestampMs = Some(timestampMs))
 
-      val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+      val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
+        .getValueState[String]("testState", Encoders.STRING)
+        .asInstanceOf[ValueStateImplWithTTL[String]]
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
 
-      val ex = intercept[SparkUnsupportedOperationException] {
-        testState.update("v1", Duration.ofMinutes(-1))
-      }
-
-      checkError(
-        ex,
-        errorClass = "STATEFUL_PROCESSOR_TTL_VALUE_CANNOT_BE_NEGATIVE",
-        parameters = Map(
-          "operationType" -> "update",
-          "stateName" -> "testState"
-        ),
-        matchPVals = true
-      )
+      // ensure get does not return the expired value
+      assert(!nextBatchTestState.exists())
+      assert(nextBatchTestState.get() === null)
+
+      // ttl value should still exist in state
+      ttlValue = nextBatchTestState.getTTLValue()
+      assert(ttlValue.isDefined)
+      assert(ttlValue.get === ttlExpirationMs)
+      ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
+      assert(ttlStateValueIterator.hasNext)
+      assert(ttlStateValueIterator.next() === ttlExpirationMs)
+      assert(ttlStateValueIterator.isEmpty)
+
+      // getWithoutTTL should still return the expired value
+      assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
+
+      nextBatchTestState.clear()
+      assert(!nextBatchTestState.exists())
+      assert(nextBatchTestState.get() === null)
+
+      nextBatchTestState.clear()

Review Comment:
   nit: redundant? is there an intention to do this twice?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -312,187 +312,109 @@ class ValueStateSuite extends StateVariableSuiteBase {
     }
   }
 
-  Seq(TTLMode.ProcessingTimeTTL(), TTLMode.EventTimeTTL()).foreach { ttlMode =>
-    test(s"test Value state TTL for $ttlMode") {
-      tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-        val store = provider.getStore(0)
-        val timestampMs = 10
-        val handle = createHandleForTtlMode(ttlMode, store, timestampMs)
-
-        val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-          Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-        testState.update("v1")
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        var ttlValue = testState.getTTLValue()
-        assert(ttlValue.isEmpty)
-        var ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.isEmpty)
-
-        testState.clear()
-        assert(!testState.exists())
-        assert(testState.get() === null)
-
-        val ttlExpirationMs = timestampMs + 60000
-
-        if (ttlMode == TTLMode.ProcessingTimeTTL()) {
-          testState.update("v1", Duration.ofMinutes(1))
-        } else {
-          testState.update("v1", ttlExpirationMs)
-        }
-        assert(testState.get() === "v1")
-        assert(testState.getWithoutEnforcingTTL().get === "v1")
-
-        ttlValue = testState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = testState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // increment batchProcessingTime, or watermark and ensure expired 
value is not returned
-        val nextBatchHandle = createHandleForTtlMode(ttlMode, store, 
ttlExpirationMs)
-
-        val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle
-          .getValueState[String]("testState", Encoders.STRING)
-          .asInstanceOf[ValueStateImplWithTTL[String]]
-        ImplicitGroupingKeyTracker.setImplicitKey("test_key")
-
-        // ensure get does not return the expired value
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        // ttl value should still exist in state
-        ttlValue = nextBatchTestState.getTTLValue()
-        assert(ttlValue.isDefined)
-        assert(ttlValue.get === ttlExpirationMs)
-        ttlStateValueIterator = nextBatchTestState.getValuesInTTLState()
-        assert(ttlStateValueIterator.hasNext)
-        assert(ttlStateValueIterator.next() === ttlExpirationMs)
-        assert(ttlStateValueIterator.isEmpty)
-
-        // getWithoutTTL should still return the expired value
-        assert(nextBatchTestState.getWithoutEnforcingTTL().get === "v1")
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-
-        nextBatchTestState.clear()
-        assert(!nextBatchTestState.exists())
-        assert(nextBatchTestState.get() === null)
-      }
-    }
-  }
 
-  test("test TTL duration throws error for event time") {
+  test(s"test Value state TTL") {
     tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
       val store = provider.getStore(0)
-      val eventTimeWatermarkMs = 10
+      val timestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TTLMode.EventTimeTTL(), TimeoutMode.NoTimeouts(),
-        eventTimeWatermarkMs = Some(eventTimeWatermarkMs))
+        TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+        batchTimestampMs = Some(timestampMs))
 
+      val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
       val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+        Encoders.STRING, ttlConfig).asInstanceOf[ValueStateImplWithTTL[String]]
       ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      testState.update("v1")
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
 
-      val ex = intercept[SparkUnsupportedOperationException] {
-        testState.update("v1", Duration.ofMinutes(1))
-      }
+      var ttlValue = testState.getTTLValue()
+      assert(ttlValue.isEmpty)
+      var ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.isEmpty)
 
-      checkError(
-        ex,
-        errorClass = 
"STATEFUL_PROCESSOR_CANNOT_USE_TTL_DURATION_IN_EVENT_TIME_TTL_MODE",
-        parameters = Map(
-          "operationType" -> "update",
-          "stateName" -> "testState"
-        ),
-        matchPVals = true
-      )
-    }
-  }
+      testState.clear()
+      assert(!testState.exists())
+      assert(testState.get() === null)
 
-  test("test negative TTL duration throws error") {
-    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
-      val store = provider.getStore(0)
-      val batchTimestampMs = 10
-      val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
+      val ttlExpirationMs = timestampMs + 60000
+
+      testState.update("v1")
+      assert(testState.get() === "v1")
+      assert(testState.getWithoutEnforcingTTL().get === "v1")
+
+      ttlValue = testState.getTTLValue()
+      assert(ttlValue.isDefined)
+      assert(ttlValue.get === ttlExpirationMs)
+      ttlStateValueIterator = testState.getValuesInTTLState()
+      assert(ttlStateValueIterator.hasNext)
+      assert(ttlStateValueIterator.next() === ttlExpirationMs)
+      assert(ttlStateValueIterator.isEmpty)
+
+      // increment batchProcessingTime, or watermark and ensure expired value 
is not returned
+      val nextBatchHandle = new StatefulProcessorHandleImpl(store, 
UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
         TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
-        batchTimestampMs = Some(batchTimestampMs))
+        batchTimestampMs = Some(timestampMs))
 
-      val testState: ValueStateImplWithTTL[String] = 
handle.getValueState[String]("testState",
-        Encoders.STRING).asInstanceOf[ValueStateImplWithTTL[String]]
+      val nextBatchTestState: ValueStateImplWithTTL[String] = nextBatchHandle

Review Comment:
   This seems to test the case where a state is defined with TTL once, and 
definition of TTL is removed after restarting the query. It's great we have a 
test to make sure flipping this does not bring up correctness issue.
   
   While we are here, we do not support opposite direction, right? Defining the 
state without TTL in the first query run, and defining TTL to that state during 
the query restart. I expect  schema compatibility error, but wanted to double 
confirm. If I'm missing something and we support the case, let's have a test 
for that as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to