HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153114879


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   It actually came from flatMapGroupsWithState but now I get your question in 
offline. Ideally saying, we should probably use the same type the event time 
column has, and disallow changing type of the event time column.
   
   While we can try to address the issue in this PR, it'd probably require 
another state format for other stateful operators if they have the same issue. 
(I'm seeing the same issue from flatMapGroupsWithState but not from others, 
hopefully.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to