rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151447031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>
+        val row = r.asInstanceOf[UnsafeRow]
+        val key = getKey(row)
+        val value = store.get(key)
+        if (value == null) {
+          val timestamp = row.getLong(eventTimeColOrdinal)
+          // The unit of timestamp in Spark is microseconds, convert the delay 
threshold.
+          val expiresAt = timestamp + delayThresholdMillis * 1000
+
+          timeoutRow.setLong(0, expiresAt)
+          store.put(key, timeoutRow)
+
+          numUpdatedStateRows += 1
+          numOutputRows += 1
+          true
+        } else {
+          // Drop duplicated rows
+          numDroppedDuplicateRows += 1
+          false
+        }
+      }
+
+      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+        allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
+        allRemovalsTimeMs += timeTakenMs {
+          // Convert watermark value to microsecond
+          val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000
+          store.iterator().foreach { rowPair =>

Review Comment:
   It depends on the operator. E.g. for windowed aggregation : if we keep 
window as the first column and the state store supports efficient range 
iterator, it could minimize scanning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to