HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153976547


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   Thanks for replicating the message :) That said, it seems to be still safe 
to store underlying value as it is, with LongType.
   
   Maybe users may feel odd if they use TimestampType and TimestampNTZType 
interchangeably in the same column across multiple microbatches, but it's also 
an existing case if the query ran from US and stopped and somehow reran from 
Korea. The value doesn't change, but people may be confused with the 
representation. Treating this nicely seems to be beyond the scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to