neilramaswamy commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1847435410


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala:
##########
@@ -19,274 +19,553 @@ package org.apache.spark.sql.execution.streaming
 import java.time.Duration
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.metric.SQLMetric
 import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
-import 
org.apache.spark.sql.execution.streaming.state.{RangeKeyScanStateEncoderSpec, 
StateStore}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
RangeKeyScanStateEncoderSpec, StateStore}
+import org.apache.spark.sql.streaming.TTLConfig
 import org.apache.spark.sql.types._
 
-object StateTTLSchema {
-  val TTL_VALUE_ROW_SCHEMA: StructType =
-    StructType(Array(StructField("__dummy__", NullType)))
-}
-
 /**
- * Encapsulates the ttl row information stored in [[SingleKeyTTLStateImpl]].
+ * Any state variable that wants to support TTL must implement this trait,
+ * which they can do by extending [[OneToOneTTLState]] or 
[[OneToManyTTLState]].
  *
- * @param groupingKey grouping key for which ttl is set
- * @param expirationMs expiration time for the grouping key
- */
-case class SingleKeyTTLRow(
-    groupingKey: UnsafeRow,
-    expirationMs: Long)
-
-/**
- * Encapsulates the ttl row information stored in [[CompositeKeyTTLStateImpl]].
+ * The only required methods here are ones relating to evicting expired and all
+ * state, via clearExpiredStateForAllKeys and clearAllStateForElementKey,
+ * respectively. How classes do this is implementation detail, but the general
+ * pattern is to use secondary indexes to make sure cleanup scans
+ * theta(records to evict), not theta(all records).
  *
- * @param groupingKey grouping key for which ttl is set
- * @param userKey user key for which ttl is set
- * @param expirationMs expiration time for the grouping key
- */
-case class CompositeKeyTTLRow(
-   groupingKey: UnsafeRow,
-   userKey: UnsafeRow,
-   expirationMs: Long)
-
-/**
- * Represents the underlying state for secondary TTL Index for a user defined
- * state variable.
+ * There are two broad patterns of implementing stateful variables, and thus
+ * there are two broad patterns for implementing TTL. The first is when there
+ * is a one-to-one mapping between an element key [1] and a value; the primary
+ * and secondary index management for this case is implemented by
+ * [[OneToOneTTLState]]. When a single element key can have multiple values,
+ * all at which can expire at their own, unique times, then
+ * [[OneToManyTTLState]] should be used.
+ *
+ * In either case, implementations need to use some sort of secondary index
+ * that orders element keys by expiration time. This base functionality
+ * is provided by methods in this trait that read/write/delete to the
+ * so-called "TTL index". It is a secondary index with the layout of
+ * (expirationMs, elementKey) -> EMPTY_ROW. The expirationMs is big-endian
+ * encoded to allow for efficient range scans to find all expired keys.
+ *
+ * TTLState (or any abstract sub-classes) should never deal with encoding or
+ * decoding UnsafeRows to and from their user-facing types. The stateful 
variable
+ * themselves should be doing this; all other TTLState sub-classes should be 
concerned
+ * only with writing, reading, and deleting UnsafeRows and their associated
+ * expirations from the primary and secondary indexes. [2]
  *
- * This state allows Spark to query ttl values based on expiration time
- * allowing efficient ttl cleanup.
+ * [1]. You might ask, why call it "element key" instead of "grouping key"?
+ *      This is because a single grouping key might have multiple elements, as 
in
+ *      the case of a map, which has composite keys of the form (groupingKey, 
mapKey).
+ *      In the case of ValueState, though, the element key is the grouping key.
+ *      To generalize to both cases, this class should always use the term 
elementKey.)
+ *
+ * [2]. You might also ask, why design it this way? We want the TTLState 
abstract
+ *      sub-classes to write to both the primary and secondary indexes, since 
they
+ *      both need to stay in sync; co-locating the logic is cleanest.
  */
 trait TTLState {
+  // Name of the state variable, e.g. the string the user passes to 
get{Value/List/Map}State
+  // in the init() method of a StatefulProcessor.
+  def stateName: String
+
+  // The StateStore instance used to store the state. There is only one 
instance shared
+  // among the primary and secondary indexes, since it uses virtual column 
families
+  // to keep the indexes separate.
+  def store: StateStore

Review Comment:
   Yes, it's needed because now sub-classes of `TTLState` write to the 
primary/secondary indexes themselves. This is basically a Scala trait 
"parameter".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to