HeartSaVioR commented on code in PR #48728:
URL: https://github.com/apache/spark/pull/48728#discussion_r1827273630
##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -137,14 +154,10 @@ private[sql] trait StatefulProcessorHandle extends
Serializable {
* and will be eventually removed from the state.
*
* The user must ensure to call this function only within the `init()`
method of the
- * StatefulProcessor.
+ * StatefulProcessor. Note that this API uses the implicit SQL encoder in
Scala.
Review Comment:
ditto
##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -30,58 +30,72 @@ import org.apache.spark.sql.Encoder
private[sql] trait StatefulProcessorHandle extends Serializable {
/**
- * Function to create new or return existing single value state variable of
given type. The user
- * must ensure to call this function only within the `init()` method of the
StatefulProcessor.
+ * Function to create new or return existing single value state variable of
given type with ttl.
+ * State values will not be returned past ttlDuration, and will be
eventually removed from the
+ * state store. Any state update resets the ttl to current processing time
plus ttlDuration.
+ *
+ * The user must ensure to call this function only within the `init()`
method of the
+ * StatefulProcessor.
*
* @param stateName
* \- name of the state variable
* @param valEncoder
* \- SQL encoder for state variable
+ * @param ttlConfig
+ * \- the ttl configuration (time to live duration etc.)
* @tparam T
* \- type of state variable
* @return
* \- instance of ValueState of type T that can be used to store state
persistently
*/
- def getValueState[T](stateName: String, valEncoder: Encoder[T]):
ValueState[T]
+ def getValueState[T](
+ stateName: String,
+ valEncoder: Encoder[T],
+ ttlConfig: TTLConfig): ValueState[T]
/**
* Function to create new or return existing single value state variable of
given type with ttl.
* State values will not be returned past ttlDuration, and will be
eventually removed from the
* state store. Any state update resets the ttl to current processing time
plus ttlDuration.
*
* The user must ensure to call this function only within the `init()`
method of the
- * StatefulProcessor.
+ * StatefulProcessor. Note that this API uses the implicit SQL encoder in
Scala.
Review Comment:
nit: we tend to add `(Scala-specific) ` at the start of method doc for Scala
API. See methods in Dataset.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -230,11 +205,39 @@ class StatefulProcessorHandleImpl(
}
}
- override def getListState[T](stateName: String, valEncoder: Encoder[T]):
ListState[T] = {
- verifyStateVarOperations("get_list_state", CREATED)
- val resultState = new ListStateImpl[T](store, stateName, keyEncoder,
valEncoder, metrics)
- TWSMetricsUtils.incrementMetric(metrics, "numListStateVars")
- resultState
+ override def getValueState[T](
+ stateName: String,
+ valEncoder: Encoder[T],
+ ttlConfig: TTLConfig): ValueState[T] = {
+ getValueState(stateName, ttlConfig)(valEncoder)
+ }
+
+ override def getValueState[T: Encoder](
+ stateName: String,
+ ttlConfig: TTLConfig): ValueState[T] = {
+ verifyStateVarOperations("get_value_state", CREATED)
+ val ttlEnabled = if (ttlConfig.ttlDuration != null &&
ttlConfig.ttlDuration.isZero) {
+ false
+ } else {
+ true
+ }
+
+ val stateEncoder = encoderFor[T].asInstanceOf[ExpressionEncoder[Any]]
+ val result = if (ttlEnabled) {
+ validateTTLConfig(ttlConfig, stateName)
+ assert(batchTimestampMs.isDefined)
+ val valueStateWithTTL = new ValueStateImplWithTTL[T](store, stateName,
+ keyEncoder, stateEncoder, ttlConfig, batchTimestampMs.get, metrics)
Review Comment:
nit: 2 spaces so that it's shown as continuation of the previous line
##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -90,34 +104,36 @@ private[sql] trait StatefulProcessorHandle extends
Serializable {
* get() and will be eventually removed from the state.
*
* The user must ensure to call this function only within the `init()`
method of the
- * StatefulProcessor.
+ * StatefulProcessor. Note that this API uses the implicit SQL encoder in
Scala.
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala:
##########
@@ -231,7 +232,8 @@ class MapStateImplWithTTL[K, V](
// ttlExpiration
Option(retRow).flatMap { row =>
val ttlExpiration = stateTypesEncoder.decodeTtlExpirationMs(row)
- ttlExpiration.map(expiration => (stateTypesEncoder.decodeValue(row),
expiration))
+ ttlExpiration.map(expiration => (
Review Comment:
nit: shall we move ( down? Also `.map { expiration =>` so that it's not
confused with ( and ) for tuple.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]