HeartSaVioR commented on PR #48570: URL: https://github.com/apache/spark/pull/48570#issuecomment-2437017582
We actually have another issue with this which needs some API design for the fix; we only persist the final watermark value into commit log. This could lead the tricky scenario, like following: Suppose a streaming query having streaming source A, B, C. The query is running as scheduled manner (with Trigger.AvailableNow), hence the query is expected to be stopped once the all available data is processed. * 1st trigger: there are new available data for A and B. Spark processes the data and decides the watermark for next batch to be 0, because there is no data observed in C. * 2nd trigger: it starts from the committed batch of 1st trigger, watermark = 0. there are new available data for B and C. Spark processes the data and decides the watermark for next batch to be 0, because there is no data observed in A. This is incorrect, because Spark has processed the data from all streaming sources A, B, and C. To solve this, we need a consistent order (or alias) of these watermark nodes "across query runs", which is tricky. We could probably rely on the traversal order like we do for stateful op ID, but it might be ideal if we could think of allowing users to set alias to the node/operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
