Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-13 Thread via GitHub


HeartSaVioR closed pull request #45051: [SPARK-46913][SS] Add support for 
processing/event time based timers with transformWithState operator
URL: https://github.com/apache/spark/pull/45051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-13 Thread via GitHub


HeartSaVioR commented on PR #45051:
URL: https://github.com/apache/spark/pull/45051#issuecomment-1995685935

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-12 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1522327291


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-12 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1522188039


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
   Iterator((key, count.toString))
 }
   }
+}
+
+// Class to verify stateful processor usage with adding processing time timers
+class RunningCountStatefulProcessorWithProcTimeTimer extends 
RunningCountStatefulProcessor {
+  private def handleProcessingTimeBasedTimers(
+  key: String,
+  expiryTimestampMs: Long): Iterator[(String, String)] = {
+_countState.clear()
+Iterator((key, "-1"))
+  }
+
+  override def handleInputRows(
+  key: String,
+  inputRows: Iterator[String],
+  timerValues: TimerValues,
+  expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
+
+if (expiredTimerInfo.isValid()) {
+  handleProcessingTimeBasedTimers(key, 
expiredTimerInfo.getExpiryTimeInMs())
+} else {
+  val currCount = _countState.getOption().getOrElse(0L)
+  if (currCount == 0 && (key == "a" || key == "c")) {
+
_processorHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs()
+  + 5000)
+  }
+
+  val count = currCount + 1
+  if (count == 3) {
+_countState.clear()
+Iterator.empty
+  } else {
+_countState.update(count)
+Iterator((key, count.toString))
+  }
+}
+  }
+}
 
-  override def close(): Unit = {}
+// Class to verify stateful processor usage with adding/deleting processing 
time timers
+class RunningCountStatefulProcessorWithAddRemoveProcTimeTimer
+  extends RunningCountStatefulProcessor {
+  @transient private var _timerState: ValueState[Long] = _
+
+  override def init(
+  handle: StatefulProcessorHandle,
+  outputMode: OutputMode,
+  timeoutMode: TimeoutMode) : Unit = {
+super.init(handle, outputMode, timeoutMode)
+_timerState = _processorHandle.getValueState[Long]("timerState")
+  }
+
+  private def handleProcessingTimeBasedTimers(
+  key: String,
+  expiryTimestampMs: Long): Iterator[(String, String)] = {
+_timerState.clear()
+Iterator((key, "-1"))
+  }
+
+  override def handleInputRows(
+  key: String,
+  inputRows: Iterator[String],
+  timerValues: TimerValues,
+  expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
+if (expiredTimerInfo.isValid()) {
+  handleProcessingTimeBasedTimers(key, 
expiredTimerInfo.getExpiryTimeInMs())
+} else {
+  val currCount = _countState.getOption().getOrElse(0L)
+  val count = currCount + inputRows.size
+  _countState.update(count)
+  if (key == "a") {
+var nextTimerTs: Long = 0L
+if (currCount == 0) {
+  nextTimerTs = timerValues.getCurrentProcessingTimeInMs() + 5000
+  _processorHandle.registerTimer(nextTimerTs)
+  _timerState.update(nextTimerTs)
+} else if (currCount == 1) {
+  _processorHandle.deleteTimer(_timerState.get())

Review Comment:
   Yes - added it in a different test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-12 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1522187745


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -195,6 +327,115 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
 }
   }
 
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "should succeed") {
+withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+  classOf[RocksDBStateStoreProvider].getName) {
+  val clock = new StreamManualClock
+
+  val inputData = MemoryStream[String]
+  val result = inputData.toDS()
+.groupByKey(x => x)
+.transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
+  TimeoutMode.ProcessingTime(),
+  OutputMode.Update())
+
+  testStream(result, OutputMode.Update())(
+StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+AddData(inputData, "a"),
+AdvanceManualClock(1 * 1000),
+CheckNewAnswer(("a", "1")),
+
+AddData(inputData, "b"),
+AdvanceManualClock(1 * 1000),
+CheckNewAnswer(("b", "1")),
+
+AddData(inputData, "b"),
+AdvanceManualClock(10 * 1000),
+CheckNewAnswer(("a", "-1"), ("b", "2")),
+
+StopStream,
+StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+AddData(inputData, "b"),
+AddData(inputData, "c"),
+AdvanceManualClock(1 * 1000),
+CheckNewAnswer(("c", "1")),
+AddData(inputData, "d"),
+AdvanceManualClock(10 * 1000),
+CheckNewAnswer(("c", "-1"), ("d", "1")),
+StopStream
+  )
+}
+  }
+
+  test("transformWithState - streaming with rocksdb and processing time timer 
" +
+   "and add/remove timers should succeed") {
+withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+  classOf[RocksDBStateStoreProvider].getName) {
+  val clock = new StreamManualClock
+
+  val inputData = MemoryStream[String]
+  val result = inputData.toDS()
+.groupByKey(x => x)
+.transformWithState(
+  new RunningCountStatefulProcessorWithAddRemoveProcTimeTimer(),
+  TimeoutMode.ProcessingTime(),
+  OutputMode.Update())
+
+  testStream(result, OutputMode.Update())(
+StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+AddData(inputData, "a"),
+AdvanceManualClock(1 * 1000),
+CheckNewAnswer(("a", "1")),
+
+AddData(inputData, "a"),
+AdvanceManualClock(2 * 1000),
+CheckNewAnswer(("a", "2")),
+StopStream,
+
+StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+AddData(inputData, "d"),
+AdvanceManualClock(10 * 1000),
+CheckNewAnswer(("a", "-1"), ("d", "1")),

Review Comment:
   Done



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -53,8 +57,134 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
   Iterator((key, count.toString))
 }
   }
+}
+
+// Class to verify stateful processor usage with adding processing time timers

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1520644679


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -163,6 +249,16 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
+timeoutMode match {
+  case ProcessingTime =>
+require(batchTimestampMs.nonEmpty)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1520644342


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString)
+}
+
+if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString)
+}
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1520520070


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -103,8 +116,12 @@ case class TransformWithStateExec(
 val keyObj = getKeyObj(keyRow)  // convert key to objects
 ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
 val valueObjIter = valueRowIter.map(getValueObj.apply)
-val mappedIterator = statefulProcessor.handleInputRows(keyObj, 
valueObjIter,
-  new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForLateEvents)).map { obj =>
+val mappedIterator = statefulProcessor.handleInputRows(
+  keyObj,
+  valueObjIter,
+  new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents),

Review Comment:
   Done - updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1520519676


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString)
+}
+
+if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) {

Review Comment:
   Well - we don't want to allow creation when the `currState` is `CREATED` 
either. Updated the condition little bit to make it more readable. Pls take a 
look 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1520472228


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {

Review Comment:
   Done



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519943584


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -103,8 +116,12 @@ case class TransformWithStateExec(
 val keyObj = getKeyObj(keyRow)  // convert key to objects
 ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
 val valueObjIter = valueRowIter.map(getValueObj.apply)
-val mappedIterator = statefulProcessor.handleInputRows(keyObj, 
valueObjIter,
-  new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForLateEvents)).map { obj =>
+val mappedIterator = statefulProcessor.handleInputRows(
+  keyObj,
+  valueObjIter,
+  new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents),

Review Comment:
   I discovered this as well and  updated it to `eventTimeForEviction` in my 
PR. 
https://github.com/apache/spark/pull/45376/files#diff-2bac1c42eb2edac75b4d725015d7a690269eb0869389e0347b8b6c01d222



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519251919


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519192303


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519192303


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519187563


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519185494


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519178347


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519009915


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519006106


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


neilramaswamy commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518995350


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-10 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518795524


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-09 Thread via GitHub


neilramaswamy commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518738187


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-09 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518723182


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-09 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518698125


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-09 Thread via GitHub


neilramaswamy commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518669207


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-09 Thread via GitHub


neilramaswamy commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1518669207


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+StateStore.stop()
+require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+StateStore.stop()
+require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+RocksDBStateStoreProvider = {
+newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+  numColsPrefixKey = 0,
+  useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+storeId: StateStoreId,
+numColsPrefixKey: Int,
+sqlConf: Option[SQLConf] = None,
+conf: Configuration = new Configuration,
+useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+val provider = new RocksDBStateStoreProvider()
+provider.init(
+  storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+  useColumnFamilies,
+  new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+provider
+  }
+
+  private def tryWithProviderResource[T](
+provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+try {
+  f(provider)
+} finally {
+  provider.close()
+}
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+timeoutMode match {
+  case "NoTimeouts" => TimeoutMode.NoTimeouts()
+  case "ProcessingTime" => TimeoutMode.ProcessingTime()
+  case "EventTime" => TimeoutMode.EventTime()
+  case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+}
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+  tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+val store = provider.getStore(0)
+val handle = new StatefulProcessorHandleImpl(store,
+  UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+handle.getValueState[Long]("testState")
+  }
+}
+  }
+
+  private def verifyInvalidOperation(
+  handle: StatefulProcessorHandleImpl,
+  handleState: StatefulProcessorHandleState.Value,
+  errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+handle.setHandleState(handleState)
+assert(handle.getHandleState === handleState)
+val ex = intercept[Exception] {
+  fn(handle)
+}
+assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1517119556


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Class used to provide access to expired timer's expiry time and timeout 
mode. These values

Review Comment:
   nit: Technically the timeout mode is not visible with trait. If that's 
intentional, probably remove that part in the interface doc.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode}
+
+/**
+ * Class that provides a concrete implementation that can be used to provide 
access to expired
+ * timer's expiry time and timeout mode. These values are only relevant if the 
ExpiredTimerInfo

Review Comment:
   nit: same, timeout mode is not visible to user function AFAIK.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -103,8 +116,12 @@ case class TransformWithStateExec(
 val keyObj = getKeyObj(keyRow)  // convert key to objects
 ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
 val valueObjIter = valueRowIter.map(getValueObj.apply)
-val mappedIterator = statefulProcessor.handleInputRows(keyObj, 
valueObjIter,
-  new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForLateEvents)).map { obj =>
+val mappedIterator = statefulProcessor.handleInputRows(
+  keyObj,
+  valueObjIter,
+  new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents),
+  new ExpiredTimerInfoImpl(false)

Review Comment:
   super nit / 2 cents: name parameter for boolean would give much better 
readability in non-IDE environment. It's really more about general suggestion 
and preference, so you can ignore.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString)
+}
+
+if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString)
+}
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   I'm OK with moving the logging to TimerStateImpl if it helps to solve this.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
  

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-29 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1507971789


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   Yes thats correct. Will fire within the same microbatch 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-29 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1507925323


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   As we process timers after the data, this would result in firing of the 
timer right away in the same micro-batch after the data is processed. I think 
that is fine, just pointing it out. 
   
   Also, equality should take care of firing the timer right away though, isn't 
it?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1505248645


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1505126889


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   I thought we decided not to do this ? So basically we allow for a timestamp 
earlier than latest to be generated which would fire the timer right away in 
the next invocation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1505126284


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  private def encodeSecIndexKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(tsToKeyCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val bbuf = ByteBuffer.allocate(8)
+bbuf.order(ByteOrder.BIG_ENDIAN)
+bbuf.putLong(expiryTimestampMs)

Review Comment:
   

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1505125882


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Class used to provide access to expired timer's expiry time and timeout 
mode. These values
+ * are only relevant if the ExpiredTimerInfo is valid.
+ */
+@Experimental
+@Evolving
+private[sql] trait ExpiredTimerInfo extends Serializable {
+  /**
+   * Check if provided ExpiredTimerInfo is valid.
+   */
+  def isValid(): Boolean
+
+  /**
+   * Get the expired timer's expiry time as milliseconds in epoch time.
+   */
+  def getExpiryTimeInMs(): Long
+
+  /**
+   * Get the expired timer's timeout mode.
+   */
+  def getTimeoutMode(): TimeoutMode

Review Comment:
   Done - modified this. Discussed offline and we decided to pass the mode here 
to the `init` method. If we decide to support both timer types in the future, 
the mode can be changed and we will also add ability to specify timer type for 
registering/deleting timers



##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##
@@ -51,6 +51,25 @@ private[sql] trait StatefulProcessorHandle extends 
Serializable {
   /** Function to return queryInfo for currently running task */
   def getQueryInfo(): QueryInfo
 
+  /**
+   * Function to register a processing/event time based timer for given 
implicit key

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1504503668


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -252,16 +252,30 @@ class RocksDB(
 }
   }
 
+  /**
+   * Check whether the column family name is for internal column families.
+   * @param cfName - column family name
+   * @return - true if the column family is for internal use, false otherwise
+   */
+  private def checkInternalColumnFamilies(cfName: String): Boolean = 
cfName.charAt(0) == '_'
+
   /**
* Create RocksDB column family, if not created already
*/
-  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+  def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = 
false): Unit = {
+// Remove leading and trailing whitespaces
+val cfName = colFamilyName.trim
+
+if (cfName == StateStore.DEFAULT_COL_FAMILY_NAME) {
   throw new SparkUnsupportedOperationException(
 errorClass = "_LEGACY_ERROR_TEMP_3197",
 messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
 }
 
+if (!isInternal && cfName.charAt(0) == '_') {

Review Comment:
   We should add a check that the cfName is not empty string, and throw an 
error. [Right now we will end in IndexOutofBoundsException] 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   We should validate here that `expiryTimestampMs` is >= `batchTimestampMs`, 
or watermark based on the timeout mode. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-27 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1504459373


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  private def encodeSecIndexKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(tsToKeyCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val bbuf = ByteBuffer.allocate(8)
+bbuf.order(ByteOrder.BIG_ENDIAN)
+bbuf.putLong(expiryTimestampMs)

Review Comment:
   Found 

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-26 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1503733977


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -69,8 +70,20 @@ case class TransformWithStateExec(
 
   override def shortName: String = "transformWithStateExec"
 
-  // TODO: update this to run no-data batches when timer support is added
-  override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = false
+  override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
+timeoutMode match {
+  // TODO: check if we can return true only if actual timers are registered

Review Comment:
   Yea - but the time at which this is called - I believe we don't have the 
storeRDD or store instance available directly. We could potentially track some 
count, but haven't tried the change/optimization yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-26 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1503733123


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -48,16 +48,19 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* @param inputRows - iterator of input rows associated with grouping key
* @param timerValues - instance of TimerValues that provides access to 
current processing/event
*time if available
+   * @param expiredTimerInfo - instance of ExpiredTimerInfo that provides 
access to expired timer
+   * if applicable
* @return - Zero or more output rows
*/
   def handleInputRows(
   key: K,
   inputRows: Iterator[I],
-  timerValues: TimerValues): Iterator[O]
+  timerValues: TimerValues,
+  expiredTimerInfo: ExpiredTimerInfo): Iterator[O]

Review Comment:
   Yea thought of that - but we would have the same problem with the Java API 
in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-26 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1503732422


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,

Review Comment:
   The existing one is tied to `GroupState`, do did not want to reuse that. 
Also, if we add more modes here in the future - thought that its better to keep 
this generic and separate to this API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-26 Thread via GitHub


anishshri-db commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1503731606


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Class used to provide access to expired timer's expiry time and timeout 
mode. These values
+ * are only relevant if the ExpiredTimerInfo is valid.
+ */
+@Experimental
+@Evolving
+private[sql] trait ExpiredTimerInfo extends Serializable {
+  /**
+   * Check if provided ExpiredTimerInfo is valid.
+   */
+  def isValid(): Boolean
+
+  /**
+   * Get the expired timer's expiry time as milliseconds in epoch time.
+   */
+  def getExpiryTimeInMs(): Long
+
+  /**
+   * Get the expired timer's timeout mode.
+   */
+  def getTimeoutMode(): TimeoutMode

Review Comment:
   But that is not available to the `StatefulProcessor` though. Do you prefer 
to pass the timeout mode in the `init` method instead ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-26 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1503581091


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Class used to provide access to expired timer's expiry time and timeout 
mode. These values
+ * are only relevant if the ExpiredTimerInfo is valid.
+ */
+@Experimental
+@Evolving
+private[sql] trait ExpiredTimerInfo extends Serializable {
+  /**
+   * Check if provided ExpiredTimerInfo is valid.
+   */
+  def isValid(): Boolean
+
+  /**
+   * Get the expired timer's expiry time as milliseconds in epoch time.
+   */
+  def getExpiryTimeInMs(): Long
+
+  /**
+   * Get the expired timer's timeout mode.
+   */
+  def getTimeoutMode(): TimeoutMode

Review Comment:
   Would this ever be different than the timeout mode provided to 
`transformWithState` API? If no, do we need this here? 



##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -48,16 +48,19 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* @param inputRows - iterator of input rows associated with grouping key
* @param timerValues - instance of TimerValues that provides access to 
current processing/event
*time if available
+   * @param expiredTimerInfo - instance of ExpiredTimerInfo that provides 
access to expired timer
+   * if applicable
* @return - Zero or more output rows
*/
   def handleInputRows(
   key: K,
   inputRows: Iterator[I],
-  timerValues: TimerValues): Iterator[O]
+  timerValues: TimerValues,
+  expiredTimerInfo: ExpiredTimerInfo): Iterator[O]
 
   /**
* Function called as the last method that allows for users to perform
* any cleanup or teardown operations.
*/
-  def close (): Unit
+  def close (): Unit = {}

Review Comment:
   Nice idea to provide a default implementation here. 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,
+s"Cannot register timers with incorrect TimeoutMode")
+verify(currState == INITIALIZED || currState == DATA_PROCESSED,
+s"Cannot register timers with " +
+  s"expiryTimestampMs=$expiryTimestampMs in current state=$currState")

Review Comment:
   We should use the NERF framework for these user errors. 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,42 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+verify(timeoutMode == ProcessingTime || timeoutMode == EventTime,

Review Comment:
   Shouldn't this be same as the timeoutMode in transformWithState API? 



##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##
@@ -51,6 +51,25 @@ private[sql] trait StatefulProcessorHandle extends 
Serializable {
   /** Function to return queryInfo for currently running task */
   def getQueryInfo(): QueryInfo
 
+  /**
+   * Function to register a processing/event time based timer for given 
implicit key

Review Comment:
   [nit] `implicit key` -> `implicit grouping key`. 



##

Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-22 Thread via GitHub


anishshri-db commented on PR #45051:
URL: https://github.com/apache/spark/pull/45051#issuecomment-1960408558

   @HeartSaVioR - PTAL, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org