[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-23 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16914044#comment-16914044
 ] 

zhijiang commented on FLINK-13798:
--

After further researching on this issue, it seems not feasible to do this 
refactoring now.

The previous way of checking status before emitting watermark works both on the 
input and output sides.
 * For the header operator in task, the StatusWatermarkValve in InputProcessor 
would check status.
 * For the chained operators in task, the chained output generated by 
OperatorChain would check status.
 * For the tail operator in task, the RecordWriterOutput would check status.

So the internal logics of {{emitWatermark/onProcessingTime}} in different 
operators do not need to do anything. But if we remove the check in 
RecordWriterOutput, that means we need to add this logic in all involved 
operators which covers many places like 
(TimestampsAndPeriodicWatermarksOperator, 
KeyedCoProcessOperatorWithWatermarkDelay, ExtractTimestampsOperator, 
MiniBatchAssignerOperator, WatermarkAssignerOperator, AbstractStreamOperator, 
etc).  So it is easy to miss some points also for some corner cases if we 
migrate this check logic in upper layer. The previous way of checking in output 
is just for avoiding missing any places in different kinds of operators. 

I already confirmed this issue with [~tzulitai] ,  [~pnowojski]  and reach the 
agreement to keep the way as now.  The new source api would trigger watermark 
only from the source, then it might make the logics easy for handling. And we 
might remove/integrate the existing checks in different outputs then.

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in WatermarkContext



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911537#comment-16911537
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for this notification [~StephanEwen]. Wish it would make the watermark 
logic easy to go in the new source operator. But now we have to solve this 
issue by refactoring TimestampsAndPeriodicWatermarksOperator. :(

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in WatermarkContext



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911455#comment-16911455
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for the replies. [~StephanEwen]

Wish it would make logic easy understand as you mentioned in new source 
operator. But now we have to solve this issue during refactoring. :(

I just found another issue which is not mentioned in our precious discussion. 
If we remove this check from RecordWriterOutput, and add this logic in 
TimestampsAndPeriodicWatermarksOperator, for the  AutomaticWatermarkContext 
case it seems no problem. But for the above second case of  
ManualWatermarkContext, I am not quite sure whether it needs the check logic as 
now. Could you help double confirm it? [~tzulitai]

 

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to make this check logic in upper 
> layer instead of current low level RecordWriterOutput. The solution is that 
> we could migrate the checking logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
> be removed in AutomaticWatermarkContext while emitting records.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911428#comment-16911428
 ] 

Stephan Ewen commented on FLINK-13798:
--

FYI: I believe one outcome of FLIP-27 will be to drop the 
{{TimestampsAndPeriodicWatermarksOperator}} and have Watermark Generation 
strictly in the source operator.

The reason is that this allows for transparent "push down" of watermark 
generation to the source partition level (like Kafka topic partition) and we 
should avoid having two slightly different ways to generate watermarks.

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * Emit watermark via source context: In the specific WatermarkContext, it 
> would toggle the  stream status as active before collecting/emitting 
> records/watermarks. Then in the implementation of RecordWriterOutput, it 
> would check the status always active before really emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer in interval time. When it happens, it would call output stack to emit 
> watermark. Then the RecordWriterOutput could take the role of checking status 
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works 
> for above second scenario, and this logic seems redundant for the first 
> scenario because WatermarkContext always toggle active status before 
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle 
> dependency with StreamStatusMaintainer, which is a blocker for the following 
> work of integrating source processing on runtime side.
> The solution is that we could migrate the checking logic from 
> RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911275#comment-16911275
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for the offline discussion and suggestion.  [~pnowojski]  [~tzulitai]

Could you double check my above understanding is right?

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * Emit watermark via source context: In the specific WatermarkContext, it 
> would toggle the  stream status as active before collecting/emitting 
> records/watermarks. Then in the implementation of RecordWriterOutput, it 
> would check the status always active before really emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer in interval time. When it happens, it would call output stack to emit 
> watermark. Then the RecordWriterOutput could take the role of checking status 
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works 
> for above second scenario, and this logic seems redundant for the first 
> scenario because WatermarkContext always toggle active status before 
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle 
> dependency with StreamStatusMaintainer, which is a blocker for the following 
> work of integrating source processing on runtime side.
> The solution is that we could migrate the checking logic from 
> RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
> also remove the toggle active logic  in existing WatermarkContext.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)