anishshri-db commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1845913571
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala:
##########
@@ -19,274 +19,553 @@ package org.apache.spark.sql.execution.streaming
import java.time.Duration
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.metric.SQLMetric
import
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
-import
org.apache.spark.sql.execution.streaming.state.{RangeKeyScanStateEncoderSpec,
StateStore}
+import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
RangeKeyScanStateEncoderSpec, StateStore}
+import org.apache.spark.sql.streaming.TTLConfig
import org.apache.spark.sql.types._
-object StateTTLSchema {
- val TTL_VALUE_ROW_SCHEMA: StructType =
- StructType(Array(StructField("__dummy__", NullType)))
-}
-
/**
- * Encapsulates the ttl row information stored in [[SingleKeyTTLStateImpl]].
+ * Any state variable that wants to support TTL must implement this trait,
+ * which they can do by extending [[OneToOneTTLState]] or
[[OneToManyTTLState]].
*
- * @param groupingKey grouping key for which ttl is set
- * @param expirationMs expiration time for the grouping key
- */
-case class SingleKeyTTLRow(
- groupingKey: UnsafeRow,
- expirationMs: Long)
-
-/**
- * Encapsulates the ttl row information stored in [[CompositeKeyTTLStateImpl]].
+ * The only required methods here are ones relating to evicting expired and all
+ * state, via clearExpiredStateForAllKeys and clearAllStateForElementKey,
+ * respectively. How classes do this is implementation detail, but the general
+ * pattern is to use secondary indexes to make sure cleanup scans
+ * theta(records to evict), not theta(all records).
*
- * @param groupingKey grouping key for which ttl is set
- * @param userKey user key for which ttl is set
- * @param expirationMs expiration time for the grouping key
- */
-case class CompositeKeyTTLRow(
- groupingKey: UnsafeRow,
- userKey: UnsafeRow,
- expirationMs: Long)
-
-/**
- * Represents the underlying state for secondary TTL Index for a user defined
- * state variable.
+ * There are two broad patterns of implementing stateful variables, and thus
+ * there are two broad patterns for implementing TTL. The first is when there
+ * is a one-to-one mapping between an element key [1] and a value; the primary
+ * and secondary index management for this case is implemented by
+ * [[OneToOneTTLState]]. When a single element key can have multiple values,
+ * all at which can expire at their own, unique times, then
Review Comment:
nit: `all of which can`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]