HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153976547
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
private val EMPTY_ROW =
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
}
+
+case class StreamingDeduplicateWithinWatermarkExec(
+ keyExpressions: Seq[Attribute],
+ child: SparkPlan,
+ stateInfo: Option[StatefulOperatorStateInfo] = None,
+ eventTimeWatermarkForLateEvents: Option[Long] = None,
+ eventTimeWatermarkForEviction: Option[Long] = None)
+ extends BaseStreamingDeduplicateExec {
+
+ protected val schemaForValueRow: StructType = StructType(
+ Array(StructField("expiresAt", LongType, nullable = false)))
Review Comment:
Thanks for replicating the message :) That said, it seems to be still safe
to store underlying value as it is, with LongType.
Maybe users may feel odd if they use TimestampType and TimestampNTZType
interchangeably in the same column across multiple microbatches, but it's also
an existing case if the query ran from US and stopped and somehow reran from
Korea. The value doesn't change, but people may be confused with the
representation. Treating this nicely seems to be beyond the scope.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]