Kimahriman commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r2096752531


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1023,67 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+    Array(StructField("expiresAtMicros", LongType, nullable = false)))
+
+  protected val extraOptionOnStateStore: Map[String, String] = Map.empty
+
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = {
+    val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow)
+    val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForValueRow))
+    Some(timeoutRow)
+  }
+
+  protected def putDupInfoIntoState(
+      store: StateStore,
+      data: UnsafeRow,
+      key: UnsafeRow,
+      reusedDupInfoRow: Option[UnsafeRow]): Unit = {
+    assert(reusedDupInfoRow.isDefined, "This should have reused row.")
+    val timeoutRow = reusedDupInfoRow.get
+
+    // We expect data type of event time column to be TimestampType or 
TimestampNTZType which both
+    // are internally represented as Long.
+    val timestamp = data.getLong(eventTimeColOrdinal)
+    // The unit of timestamp in Spark is microseconds, convert the delay 
threshold to micros.
+    val expiresAt = timestamp + DateTimeUtils.millisToMicros(delayThresholdMs)

Review Comment:
   I think the easiest example, using a 2 hour watermark delay, is:
   - Event A comes in in batch 0 at 12:00. It's stored in the state with 
`expiresAt = 14:00` (12:00 + 2 hours). Batch 0 commits with next watermark 
timestamp of 10:00 (12:00 - 2 hours)
   - Event B comes in in batch 1 at 15:00. Batch 1 commits with next watermark 
timestamp of 13:00 (15:00 - 2 hours).
   - Event A comes in in batch 2 at 15:59. It gets deduped and dropped because 
`A` is still stored in the state. Batch 2 commits with next watermark timestamp 
of 13:59 (15:59 - 2 hours)
   - Event C comes in in batch 3 at 16:01. Batch 3 gets commited with next 
watermark timestamp of 14:01 (16:01 - 2 hours).
   - Batch 4 runs and finally evicts event A from the state, because the 
watermark timestamp (14:01) is greater than the expiresAt timestamp (14:00)
   
   Event A got deduped 3 hours and 59 minutes apart, which to me breaks the 
contract of a 2 hour watermark delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to