HeartSaVioR commented on a change in pull request #31989:
URL: https://github.com/apache/spark/pull/31989#discussion_r606898837



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
+import org.apache.spark.util.NextIterator
+
+/**
+ * Base trait for state manager purposed to be used from streaming session 
window aggregation.
+ */
+sealed trait StreamingSessionWindowStateManager extends Serializable {
+
+  def getKey(row: InternalRow): UnsafeRow
+
+  def getStartTime(row: InternalRow): Long
+
+  /**
+   * Returns all stored keys.
+   */
+  def getAllKeys(): Iterator[UnsafeRow]
+
+  /**
+   * Returns a list of states for the key. These states are candidates for 
session window
+   * merging.
+   */
+  def getStates(key: UnsafeRow): Seq[UnsafeRow]
+
+  /**
+   * Returns a list of start times for session windows belonging to the given 
key.
+   */
+  def getStartTimeList(key: UnsafeRow): Seq[Long]
+
+  /**
+   * Returns the state of given key and start time.
+   */
+  def getState(key: UnsafeRow, startTime: Long): UnsafeRow
+
+  /**
+   * Returns a list of states for all keys.
+   */
+  def getStates(): Seq[UnsafeRow]
+
+  /**
+   * Puts a state row into the state with given key and start time. Note that 
this method
+   * does not update the start time into the list of start times for the given 
key. To update
+   * the list, please call `putStartTimeList`.
+   */
+  def putState(key: UnsafeRow, startTime: Long, value: UnsafeRow): Unit
+
+  /**
+   * Puts a list of states for given key. This method will update the list of 
start times
+   * for the given key.
+   */
+  def putStates(key: UnsafeRow, values: Seq[UnsafeRow]): Unit
+
+  /**
+   * Puts a list of start times of session windows for given key into the 
state. The list
+   * of start times must be sorted.
+   */
+  def putStartTimeList(key: UnsafeRow, startTimes: Seq[Long]): Unit
+
+  /**
+   * Removes specified state for the given key and start time.
+   */
+  def removeState(key: UnsafeRow, startTime: Long): Unit
+
+  /**
+   * Removes given key from the state store. This method will remove all 
states associated
+   * with the given key if any.
+   */
+  def removeKey(key: UnsafeRow): Unit
+
+  /**
+   * Given a callback function used to update state store metrics, updates the 
metrics of all
+   * state stores.
+   */
+  def updateMetrics(updateFunc: StateStoreMetrics => Unit)
+
+  /**
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the values such that 
value satisfies the
+   * predicate, where producing an element removes the value from the state 
store and producing
+   * all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
+   */
+  def removeByValueCondition(removalCondition: UnsafeRow => Boolean): 
Iterator[UnsafeRow]

Review comment:
       And this change is in #31937.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to