HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153935588
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
private val EMPTY_ROW =
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
}
+
+case class StreamingDeduplicateWithinWatermarkExec(
+ keyExpressions: Seq[Attribute],
+ child: SparkPlan,
+ stateInfo: Option[StatefulOperatorStateInfo] = None,
+ eventTimeWatermarkForLateEvents: Option[Long] = None,
+ eventTimeWatermarkForEviction: Option[Long] = None)
+ extends BaseStreamingDeduplicateExec {
+
+ protected val schemaForValueRow: StructType = StructType(
+ Array(StructField("expiresAt", LongType, nullable = false)))
Review Comment:
Let's consolidate this to below comment thread. I pinged Gengliang in below
comment to understand what is the use case story for TimestampType vs
TimestampNTZType, is the type somehow interchangeable, etc. Before that, I'll
just roll back the change to match with flatMapGroupsWithState.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]