[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-06-21 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31901:
-
Fix Version/s: ml-2.4.0
   (was: ml-2.3.0)

> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.4.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and 
> `Input#processWatermark`.  However, processing cached element may take a long 
> time since there may be many cached records, which could potentially block 
> the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
> time: 1682319149462
> processed cached records, cnt: 1 at time: 1682319149569
> processed cached records, cnt: 2 at time: 1682319149614
> processed cached records, cnt: 3 at time: 1682319149655
> processed cached records, cnt: 4 at time: 1682319149702
> processed cached records, cnt: 5 at time: 1682319149746
> processed cached records, cnt: 6 at time: 1682319149781
> processed cached records, cnt: 7 at time: 1682319149891
> processed cached records, cnt: 8 at time: 1682319150011
> processed cached records, cnt: 9 at time: 1682319150116
> processed cached records, cnt: 10 at time: 1682319150199
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
> time: 1682319150378
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
> time: 1682319150606
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
> time: 1682319150704
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
> time: 1682319150785
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
> time: 1682319150859
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
> time: 1682319150935
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
> time: 1682319151007{code}
>  
> We can find that from line#2 to line#11, there is no checkpoints and the 
> barriers are blocked until all cached elements are processed, which takes 
> ~600ms and much longer than checkpoint interval (i.e., 100ms)
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-05-26 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#2 to line#11, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> 

[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
time: 1682319149462
processed cached records, cnt: 1 at time: 1682319149569
processed cached records, cnt: 2 at time: 1682319149614
processed cached records, cnt: 3 at time: 1682319149655
processed cached records, cnt: 4 at time: 1682319149702
processed cached records, cnt: 5 at time: 1682319149746
processed cached records, cnt: 6 at time: 1682319149781
processed cached records, cnt: 7 at time: 1682319149891
processed cached records, cnt: 8 at time: 1682319150011
processed cached records, cnt: 9 at time: 1682319150116
processed cached records, cnt: 10 at time: 1682319150199
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
time: 1682319150378
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
time: 1682319150606
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
time: 1682319150704
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
time: 1682319150785
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
time: 1682319150859
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
time: 1682319150935
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
time: 1682319151007{code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed, which takes 
~600ms and much longer than checkpoint interval (i.e., 100ms)

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
processed cached records, cnt: 1
processed cached records, cnt: 2
processed cached records, cnt: 3
processed cached records, cnt: 4
processed cached records, cnt: 5
processed cached records, cnt: 6
processed cached records, cnt: 7
processed cached records, cnt: 8
processed cached records, cnt: 9
processed cached records, cnt: 10
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed.

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.3.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived 

[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

2023-04-24 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31901:
--
Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
processed cached records, cnt: 1
processed cached records, cnt: 2
processed cached records, cnt: 3
processed cached records, cnt: 4
processed cached records, cnt: 5
processed cached records, cnt: 6
processed cached records, cnt: 7
processed cached records, cnt: 8
processed cached records, cnt: 9
processed cached records, cnt: 10
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
 

We can find that from line#3 to line#12, there is no checkpoints and the 
barriers are blocked until all cached elements are processed.

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
input until the broadcast inputs are all processed. After the broadcast 
variables are ready, we first process the cached records and then continue to 
process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and 
`Input#processWatermark`.  However, processing cached element may take a long 
time since there may be many cached records, which could potentially block the 
checkpoint barrier.


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> 
>
> Key: FLINK-31901
> URL: https://issues.apache.org/jira/browse/FLINK-31901
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
> Fix For: ml-2.3.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and 
> `Input#processWatermark`.  However, processing cached element may take a long 
> time since there may be many cached records, which could potentially block 
> the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
> processed cached records, cnt: 1
> processed cached records, cnt: 2
> processed cached records, cnt: 3
> processed cached records, cnt: 4
> processed cached records, cnt: 5
> processed cached records, cnt: 6
> processed cached records, cnt: 7
> processed cached records, cnt: 8
> processed cached records, cnt: 9
> processed cached records, cnt: 10
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
>  
> We can find that from line#3 to line#12, there is no checkpoints and the 
> barriers are blocked until all cached elements are processed.
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)