HeartSaVioR commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1859275945
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala:
##########
@@ -206,6 +201,111 @@ trait TTLState {
def clearAllStateForElementKey(elementKey: UnsafeRow): Unit
}
+/**
+ * OneToOneTTLState is an implementation of [[TTLState]] that is used to manage
+ * TTL for state variables that need a single secondary index to efficiently
manage
+ * records with an expiration.
+ *
+ * The primary index for state variables that can use a [[OneToOneTTLState]]
have
+ * the form of: [elementKey -> (value, elementExpiration)]. You'll notice
that, given
+ * a timestamp, it would take linear time to probe the primary index for all
of its
+ * expired values.
+ *
+ * As a result, this class uses helper methods from [[TTLState]] to maintain
the secondary
+ * index from [(elementExpiration, elementKey) -> EMPTY_ROW].
+ *
+ * For an explanation of why this structure is not always sufficient (e.g. why
the class
+ * [[OneToManyTTLState]] is needed), please visit its class-doc comment.
+ */
+abstract class OneToOneTTLState(
+ stateNameArg: String,
+ storeArg: StateStore,
+ elementKeySchemaArg: StructType,
+ ttlConfigArg: TTLConfig,
+ batchTimestampMsArg: Long,
+ metricsArg: Map[String, SQLMetric]) extends TTLState {
+ override def stateName: String = stateNameArg
+ override def store: StateStore = storeArg
+ override def elementKeySchema: StructType = elementKeySchemaArg
+ override def ttlConfig: TTLConfig = ttlConfigArg
+ override def batchTimestampMs: Long = batchTimestampMsArg
+ override def metrics: Map[String, SQLMetric] = metricsArg
+
+ /**
+ * This method updates the TTL for the given elementKey to be expirationMs,
+ * updating both the primary and secondary indices if needed.
+ *
+ * Note that an elementKey may be the state variable's grouping key, _or_ it
+ * could be a composite key. MapState is an example of a state variable that
+ * has composite keys, which has the structure of the groupingKey followed by
+ * the specific key in the map. This method doesn't need to know what type of
+ * key is being used, though, since in either case, it's just an UnsafeRow.
+ *
+ * @param elementKey the key for which the TTL should be updated, which may
+ * either be the encoded grouping key, or the grouping key
+ * and some user-defined key.
+ * @param elementValue the value to update the primary index with. It is of
the
+ * form (value, expirationMs).
+ * @param expirationMs the new expiration timestamp to use for elementKey.
+ */
+ def updatePrimaryAndSecondaryIndices(
+ elementKey: UnsafeRow,
+ elementValue: UnsafeRow,
+ expirationMs: Long): Unit = {
+ val existingPrimaryValue = store.get(elementKey, stateName)
+
+ // Doesn't exist. Insert into the primary and TTL indexes.
+ if (existingPrimaryValue == null) {
+ store.put(elementKey, elementValue, stateName)
+ TWSMetricsUtils.incrementMetric(metrics, "numUpdatedStateRows")
+ insertIntoTTLIndex(expirationMs, elementKey)
+ } else {
+ // The new value and the existing one may differ in either timestamp or
in actual
+ // value. In either case, we need to update the primary index.
+ if (elementValue != existingPrimaryValue) {
+ store.put(elementKey, elementValue, stateName)
+ TWSMetricsUtils.incrementMetric(metrics, "numUpdatedStateRows")
+ }
+
+ // However, the TTL index might already have the correct mapping from
expirationMs to
Review Comment:
Ah OK, this is the way you avoid relying on the structure of value. I'm fine
either way then. (No need to flip if you already made change, but feel free to
flip if you think it's better.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]