sahnib commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1548239064
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -77,14 +78,23 @@ class StatefulProcessorHandleImpl(
store: StateStore,
runId: UUID,
keyEncoder: ExpressionEncoder[Any],
+ ttlMode: TTLMode,
timeoutMode: TimeoutMode,
- isStreaming: Boolean = true)
+ isStreaming: Boolean = true,
+ batchTimestampMs: Option[Long] = None,
+ eventTimeWatermarkMs: Option[Long] = None)
extends StatefulProcessorHandle with Logging {
import StatefulProcessorHandleState._
+ /**
+ * Stores all the active ttl states, and is used to cleanup expired values
+ * in [[doTtlCleanup()]] function.
+ */
+ private val ttlStates: util.List[TTLState] = new util.ArrayList[TTLState]()
Review Comment:
No particular reason. I wanted to use a mutable data structure. Scala List
would work fine, and we can append at the front. I dont think we need to use a
Seq, as append would be less performant on it. Thoughts?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala:
##########
@@ -16,39 +16,45 @@
*/
package org.apache.spark.sql.execution.streaming
+import java.time.Duration
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import
org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA}
-import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateStore}
+import
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA}
+import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateStore, StateStoreErrors}
import org.apache.spark.sql.streaming.ValueState
/**
* Class that provides a concrete implementation for a single value state
associated with state
* variables used in the streaming transformWithState operator.
* @param store - reference to the StateStore instance to be used for storing
state
* @param stateName - name of logical state partition
- * @param keyEnc - Spark SQL encoder for key
+ * @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @tparam S - data type of object that will be stored
*/
class ValueStateImpl[S](
store: StateStore,
stateName: String,
keyExprEnc: ExpressionEncoder[Any],
- valEncoder: Encoder[S]) extends ValueState[S] with Logging {
+ valEncoder: Encoder[S])
+ extends ValueState[S] with Logging {
private val keySerializer = keyExprEnc.createSerializer()
-
private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder,
stateName)
+ private[sql] var ttlState: Option[SingleKeyTTLStateImpl] = None
+
+ initialize()
Review Comment:
Just part of refactoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]