HeartSaVioR commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1858566554


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala:
##########
@@ -206,6 +201,111 @@ trait TTLState {
   def clearAllStateForElementKey(elementKey: UnsafeRow): Unit
 }
 
+/**
+ * OneToOneTTLState is an implementation of [[TTLState]] that is used to manage
+ * TTL for state variables that need a single secondary index to efficiently 
manage
+ * records with an expiration.
+ *
+ * The primary index for state variables that can use a [[OneToOneTTLState]] 
have
+ * the form of: [elementKey -> (value, elementExpiration)]. You'll notice 
that, given
+ * a timestamp, it would take linear time to probe the primary index for all 
of its
+ * expired values.
+ *
+ * As a result, this class uses helper methods from [[TTLState]] to maintain 
the secondary
+ * index from [(elementExpiration, elementKey) -> EMPTY_ROW].
+ *
+ * For an explanation of why this structure is not always sufficient (e.g. why 
the class
+ * [[OneToManyTTLState]] is needed), please visit its class-doc comment.
+ */
+abstract class OneToOneTTLState(
+    stateNameArg: String,
+    storeArg: StateStore,
+    elementKeySchemaArg: StructType,
+    ttlConfigArg: TTLConfig,
+    batchTimestampMsArg: Long,
+    metricsArg: Map[String, SQLMetric]) extends TTLState {
+  override def stateName: String = stateNameArg
+  override def store: StateStore = storeArg
+  override def elementKeySchema: StructType = elementKeySchemaArg
+  override def ttlConfig: TTLConfig = ttlConfigArg
+  override def batchTimestampMs: Long = batchTimestampMsArg
+  override def metrics: Map[String, SQLMetric] = metricsArg
+
+  /**
+   * This method updates the TTL for the given elementKey to be expirationMs,
+   * updating both the primary and secondary indices if needed.
+   *
+   * Note that an elementKey may be the state variable's grouping key, _or_ it
+   * could be a composite key. MapState is an example of a state variable that
+   * has composite keys, which has the structure of the groupingKey followed by
+   * the specific key in the map. This method doesn't need to know what type of
+   * key is being used, though, since in either case, it's just an UnsafeRow.
+   *
+   * @param elementKey the key for which the TTL should be updated, which may
+   *                   either be the encoded grouping key, or the grouping key
+   *                   and some user-defined key.
+   * @param elementValue the value to update the primary index with. It is of 
the
+   *                     form (value, expirationMs).
+   * @param expirationMs the new expiration timestamp to use for elementKey.
+   */
+  def updatePrimaryAndSecondaryIndices(
+      elementKey: UnsafeRow,
+      elementValue: UnsafeRow,
+      expirationMs: Long): Unit = {
+    val existingPrimaryValue = store.get(elementKey, stateName)
+
+    // Doesn't exist. Insert into the primary and TTL indexes.
+    if (existingPrimaryValue == null) {
+      store.put(elementKey, elementValue, stateName)
+      TWSMetricsUtils.incrementMetric(metrics, "numUpdatedStateRows")
+      insertIntoTTLIndex(expirationMs, elementKey)
+    } else {
+      // The new value and the existing one may differ in either timestamp or 
in actual
+      // value. In either case, we need to update the primary index.
+      if (elementValue != existingPrimaryValue) {
+        store.put(elementKey, elementValue, stateName)
+        TWSMetricsUtils.incrementMetric(metrics, "numUpdatedStateRows")
+      }
+
+      // However, the TTL index might already have the correct mapping from 
expirationMs to

Review Comment:
   This assumes TTL index is in sync with primary index; as I mentioned in the 
previous comment, there is no case of
   
   * `elementValue == existingPrimaryValue`
   * but `existingExpirationMs != expirationMs`
   
   as long as elementValue contains expirationMs properly. That said, if 
`elementValue == existingPrimaryValue`, we really don't need to do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to