HeartSaVioR commented on code in PR #53930:
URL: https://github.com/apache/spark/pull/53930#discussion_r2844589707


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -27,16 +27,676 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX, 
STATE_STORE_ID}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SafeProjection, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, NamedExpression, 
SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.execution.metric.SQLMetric
-import 
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo
-import 
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOpStateStoreCheckpointInfo
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
 StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
-import 
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
 KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, 
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, 
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, 
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, 
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay}
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField, 
StructType}
+import 
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
 KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, 
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, 
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, 
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, 
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay, 
TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec, 
TimestampKeyStateEncoder}
+import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, 
StructField, StructType}
 import org.apache.spark.util.NextIterator
 
+/**
+ * Base trait of the state manager for stream-stream symmetric hash join 
operator.
+ *
+ * This defines the basic APIs for the state manager, except the methods for 
eviction which are
+ * defined in separate traits - See [[SupportsEvictByCondition]] and 
[[SupportsEvictByTimestamp]].
+ *
+ * Implementation classes are expected to inherit those traits as needed, 
depending on the eviction
+ * strategy they support.
+ */
+trait SymmetricHashJoinStateManager {
+  import SymmetricHashJoinStateManager._
+
+  def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): Unit
+
+  def get(key: UnsafeRow): Iterator[UnsafeRow]
+
+  def getJoinedRows(
+      key: UnsafeRow,
+      generateJoinedRow: InternalRow => JoinedRow,
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow]
+
+  def iterator: Iterator[KeyToValuePair]
+
+  def commit(): Unit
+
+  def abortIfNeeded(): Unit
+
+  def metrics: StateStoreMetrics
+
+  def getLatestCheckpointInfo(): JoinerStateStoreCkptInfo
+}
+
+/**
+ * This trait is specific to help the old version of state manager 
implementation (v1-v3) to work
+ * with existing tests which look up the state store with key with index.
+ */
+trait SupportsIndexedKeys {
+  def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow
+
+  protected[streaming] def updateNumValuesTestOnly(key: UnsafeRow, numValues: 
Long): Unit
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by 
condition.
+ * This is for the state manager implementations which have to perform full 
scan
+ * for eviction.
+ */
+trait SupportsEvictByCondition { self: SymmetricHashJoinStateManager =>
+  import SymmetricHashJoinStateManager._
+
+  def evictByKeyCondition(removalCondition: UnsafeRow => Boolean): Long
+
+  def evictAndReturnByKeyCondition(
+      removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+
+  def evictByValueCondition(removalCondition: UnsafeRow => Boolean): Long
+
+  def evictAndReturnByValueCondition(
+      removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by 
timestamp. This is for
+ * the state manager implementations which maintain the state with event time 
and can efficiently
+ * scan the keys with event time smaller than the given timestamp for eviction.
+ */
+trait SupportsEvictByTimestamp { self: SymmetricHashJoinStateManager =>
+  import SymmetricHashJoinStateManager._
+
+  def evictByTimestamp(endTimestamp: Long): Long
+
+  def evictAndReturnByTimestamp(endTimestamp: Long): Iterator[KeyToValuePair]
+}
+
+/**
+ * The version 4 of stream-stream join state manager implementation, which is 
designed to optimize
+ * the eviction with watermark. Previous versions require full scan to find 
the keys to evict,
+ * while this version only scans the keys with event time smaller than the 
watermark.
+ *
+ * In this implementation, we no longer build a logical array of values; 
instead, we store the
+ * (key, timestamp) -> values in the primary store, and maintain a secondary 
index of
+ * (timestamp, key) to scan the keys to evict for each watermark. To retrieve 
the values for a key,
+ * we perform prefix scan with the key to get all the (key, timestamp) -> 
values.
+ *
+ * Refer to the [[KeyWithTsToValuesStore]] and [[TsWithKeyTypeStore]] for more 
details.
+ */
+class SymmetricHashJoinStateManagerV4(
+    joinSide: JoinSide,
+    inputValueAttributes: Seq[Attribute],
+    joinKeys: Seq[Expression],
+    stateInfo: Option[StatefulOperatorStateInfo],
+    storeConf: StateStoreConf,
+    hadoopConf: Configuration,
+    partitionId: Int,
+    keyToNumValuesStateStoreCkptId: Option[String],
+    keyWithIndexToValueStateStoreCkptId: Option[String],
+    stateFormatVersion: Int,
+    skippedNullValueCount: Option[SQLMetric] = None,
+    useStateStoreCoordinator: Boolean = true,
+    snapshotOptions: Option[SnapshotOptions] = None,
+    joinStoreGenerator: JoinStateManagerStoreGenerator)
+  extends SymmetricHashJoinStateManager with SupportsEvictByTimestamp with 
Logging {
+
+  import SymmetricHashJoinStateManager._
+
+  protected val keySchema = StructType(
+    joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", 
k.dataType, k.nullable) })
+  protected val keyAttributes = toAttributes(keySchema)
+  private val eventTimeColIdxOpt = WatermarkSupport.findEventTimeColumnIndex(
+    inputValueAttributes,
+    // NOTE: This does not accept multiple event time columns. This is not the 
same with the
+    // operator which we offer the backward compatibility, but it involves too 
many layers to
+    // pass the information. The information is in SQLConf.
+    allowMultipleEventTimeColumns = false)
+
+  private val random = new scala.util.Random(System.currentTimeMillis())
+  private val bucketSizeForNoEventTime = 1024
+  private val extractEventTimeFn: UnsafeRow => Long = { row =>
+    eventTimeColIdxOpt match {
+      case Some(idx) =>
+        val attr = inputValueAttributes(idx)
+
+        if (attr.dataType.isInstanceOf[StructType]) {
+          // NOTE: We assume this is window struct, as same as 
WatermarkSupport.watermarkExpression
+          row.getStruct(idx, 2).getLong(1)
+        } else {
+          row.getLong(idx)
+        }
+
+      case _ =>
+        // When event time column is not available, we will use random 
bucketing strategy to decide
+        // where the new value will be stored. There is a trade-off between 
the bucket size and the
+        // number of values in each bucket; we can tune the bucket size with 
the configuration if
+        // we figure out the magic number to not work well.
+        random.nextInt(bucketSizeForNoEventTime)

Review Comment:
   Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to