HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895702492

   I'm not really sure I understand the logic correctly.
   
   ```
   val dedupeInputData = MemoryStream[(String, Int)]
   val dedupe = dedupeInputData.toDS()
     .withColumn("eventTime", timestamp_seconds($"_2"))
     .withWatermark("eventTime", "10 second")
     .dropDuplicatesWithinWatermark("_1")
     .select($"_1", $"eventTime".cast("long").as[Long])
   ```
   
   ```
   20:59:20.593 WARN 
org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec: CALL 
[withNewChildInternal] newChild: *(1) !Project [none#0, 
timestamp_seconds(none#1) AS #0]
   ```
   
   The above Project represents withColumn in above query. Do we expect exprId 
0 to be used "twice"? They are referring different columns but use the same 
exprId.
   (FYI, eventTime in EventTimeWatermarkExec is canonicalized as `none#1`.)
   
   You can change EventTimeWatermarkExec and run the new test in this PR to 
reproduce.
   
   ```
     override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec = {
       val metadataMap = newChild.output.map { attr =>
         attr.name -> attr.metadata
       }
       logWarning(s"CALL [withNewChildInternal] newChild: $newChild, metadata: 
$metadataMap")
       copy(child = newChild)
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to