[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2023-02-27 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-29816:
---
Affects Version/s: 1.16.1
   1.17.0
   (was: 1.14.0)
   (was: 1.16.0)

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Xie Yi
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2023-02-27 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-29816:
---
Fix Version/s: 1.17.0

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.3
>Reporter: Xie Yi
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2023-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29816:
---
Labels: pull-request-available  (was: )

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.3
>Reporter: Xie Yi
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2023-02-22 Thread RocMarshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-29816:
---
Attachment: image-2023-02-22-17-26-06-200.png

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.3
>Reporter: Xie Yi
>Assignee: RocMarshal
>Priority: Major
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-11 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-29816:

Affects Version/s: 1.15.3
   (was: 1.15.2)

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.3
>Reporter: Xie Yi
>Priority: Major
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-01 Thread Xie Yi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Yi updated FLINK-29816:
---
Attachment: image-2022-11-02-10-42-21-099.png

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.2
>Reporter: Xie Yi
>Priority: Major
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-01 Thread Xie Yi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Yi updated FLINK-29816:
---
Description: 
h4. 1. How to repeat 

ProcessWindowFunction, and make some exception in process()
test code
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6);

KafkaSource kafkaConsumer = KafkaSource.builder()
.setBootstrapServers("")
.setTopics("")
.setGroupId("")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();

DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
WatermarkStrategy.noWatermarks(), "Kafka Source");

SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.process(new ProcessWindowFunction() {
@Override
public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, 
Collector collector) throws Exception {
//when process event:"abc" .It causes 
java.lang.NumberFormatException
Integer intS = Integer.valueOf(s);
collector.collect(s);
}
})
.name("name-process").uid("uid-process");

mapSourse.print();
env.execute();
}
{code}
kafka input event
{code:java}
>1
>1
>2
>2
>3
>3
>abc
>abc
>
{code}
h4. 2. fault phenomena

when job process the event:"abc",It will cause java.lang.NumberFormatException 
and failover ,Then attempt and failover continuously.
However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
third time, It work normally, and no exception
!image-2022-10-31-19-54-12-546.png!

checkpoint 1  complete in attempt 1,before failover exception 1
{code:java}
2022-10-31 16:59:53,644 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
 

attempt 2 was restore from checkpoint
{code:java}
2022-10-31 17:00:30,033 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
7bca78a75b089d447bb4c99efcfd6527 located at 
hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
{code}
 

 
h4. 3. possible reasons

during attempt 2 , task restore from checkpoint, userfunction in 
ProcessWindowFunction was called in SteamTask.restore and produce 
"java.lang.NumberFormatException", However, SteamTask catch exception and 
didn't handle exception because subtask is not in RUNNING state.

*the stack trace in attempt 2*
user function was called in SteamTask.restore(subtask state is INITIALIZING)
{code:java}
java.lang.Thread.getStackTrace(Thread.java:1552)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-01 Thread Xie Yi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Yi updated FLINK-29816:
---
Affects Version/s: 1.14.0

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> -
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.16.0, 1.15.2
>Reporter: Xie Yi
>Priority: Major
> Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> KafkaSource kafkaConsumer = KafkaSource.builder()
> .setBootstrapServers("")
> .setTopics("")
> .setGroupId("")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction TimeWindow>() {
> @Override
> public void process(String s, 
> ProcessWindowFunction.Context context, 
> Iterable iterable, Collector collector) throws Exception {
> //when process event:"abc" .It causes 
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 ,userfunction in ProcessWindowFunction was called in 
> SteamTask.restore, and produce java.lang.NumberFormatException,However, 
> SteamTask catch exception and didn't handle exception because subtask is not 
> in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> 

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-11-01 Thread Xie Yi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Yi updated FLINK-29816:
---
Description: 
h4. 1. How to repeat 

ProcessWindowFunction, and make some exception in process()
test code
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6);

KafkaSource kafkaConsumer = KafkaSource.builder()
.setBootstrapServers("")
.setTopics("")
.setGroupId("")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();

DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
WatermarkStrategy.noWatermarks(), "Kafka Source");

SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.process(new ProcessWindowFunction() {
@Override
public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, 
Collector collector) throws Exception {
//when process event:"abc" .It causes 
java.lang.NumberFormatException
Integer intS = Integer.valueOf(s);
collector.collect(s);
}
})
.name("name-process").uid("uid-process");

mapSourse.print();
env.execute();
}
{code}
kafka input event
{code:java}
>1
>1
>2
>2
>3
>3
>abc
>abc
>
{code}
h4. 2. fault phenomena

when job process the event:"abc",It will cause java.lang.NumberFormatException 
and failover ,Then attempt and failover continuously.
However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
third time, It work normally, and no exception
!image-2022-10-31-19-54-12-546.png!

checkpoint 1  complete in attempt 1,before failover exception 1
{code:java}
2022-10-31 16:59:53,644 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
 

attempt 2 was restore from checkpoint
{code:java}
2022-10-31 17:00:30,033 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
7bca78a75b089d447bb4c99efcfd6527 located at 
hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
{code}
 

 
h4. 3. possible reasons

during attempt 2 ,userfunction in ProcessWindowFunction was called in 
SteamTask.restore, and produce java.lang.NumberFormatException,However, 
SteamTask catch exception and didn't handle exception because subtask is not in 
RUNNING state.

*the stack trace in attempt 2*
user function was called in SteamTask.restore(subtask state is INITIALIZING)
{code:java}
java.lang.Thread.getStackTrace(Thread.java:1552)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

2022-10-31 Thread Xie Yi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Yi updated FLINK-29816:
---
Description: 
h4. 1. How to repeat 

ProcessWindowFunction, and make some exception in process()
test code
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6);

KafkaSource kafkaConsumer = KafkaSource.builder()
.setBootstrapServers("")
.setTopics("")
.setGroupId("")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();

DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, 
WatermarkStrategy.noWatermarks(), "Kafka Source");

SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> 
s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.process(new ProcessWindowFunction() {
@Override
public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, 
Collector collector) throws Exception {
//when process event:"abc" .It causes 
java.lang.NumberFormatException
Integer intS = Integer.valueOf(s);
collector.collect(s);
}
})
.name("name-process").uid("uid-process");

mapSourse.print();
env.execute();
}
{code}
kafka input event
{code:java}
>1
>1
>2
>2
>3
>3
>abc
>abc
>
{code}
h4. 2. fault phenomena

when job process the event:"abc",It will cause java.lang.NumberFormatException 
and failover ,Then attempt and failover continuously.
However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
third time, It work normally, and no exception
!image-2022-10-31-19-54-12-546.png!
h4. 3. possible reasons

attempt 2 was restore from checkpoint
{code:java}
2022-10-31 17:00:30,033 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
7bca78a75b089d447bb4c99efcfd6527 located at 
hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
{code}
*the stack trace in third attempt*
user function was called in SteamTask.restore(subtask state is INITIALIZING)
{code:java}
java.lang.Thread.getStackTrace(Thread.java:1552)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:745)
{code}
stack trace(which cause failover) in attempt 0 and attempt 1
user function