anishshri-db commented on code in PR #48728:
URL: https://github.com/apache/spark/pull/48728#discussion_r1828225150


##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -30,58 +30,72 @@ import org.apache.spark.sql.Encoder
 private[sql] trait StatefulProcessorHandle extends Serializable {
 
   /**
-   * Function to create new or return existing single value state variable of 
given type. The user
-   * must ensure to call this function only within the `init()` method of the 
StatefulProcessor.
+   * Function to create new or return existing single value state variable of 
given type with ttl.
+   * State values will not be returned past ttlDuration, and will be 
eventually removed from the
+   * state store. Any state update resets the ttl to current processing time 
plus ttlDuration.
+   *
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
    *
    * @param stateName
    *   \- name of the state variable
    * @param valEncoder
    *   \- SQL encoder for state variable
+   * @param ttlConfig
+   *   \- the ttl configuration (time to live duration etc.)
    * @tparam T
    *   \- type of state variable
    * @return
    *   \- instance of ValueState of type T that can be used to store state 
persistently
    */
-  def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T]
+  def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ValueState[T]
 
   /**
    * Function to create new or return existing single value state variable of 
given type with ttl.
    * State values will not be returned past ttlDuration, and will be 
eventually removed from the
    * state store. Any state update resets the ttl to current processing time 
plus ttlDuration.
    *
    * The user must ensure to call this function only within the `init()` 
method of the
-   * StatefulProcessor.
+   * StatefulProcessor. Note that this API uses the implicit SQL encoder in 
Scala.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala:
##########
@@ -231,7 +232,8 @@ class MapStateImplWithTTL[K, V](
     // ttlExpiration
     Option(retRow).flatMap { row =>
       val ttlExpiration = stateTypesEncoder.decodeTtlExpirationMs(row)
-      ttlExpiration.map(expiration => (stateTypesEncoder.decodeValue(row), 
expiration))
+      ttlExpiration.map(expiration => (

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to