[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-29816: --- Affects Version/s: 1.16.1 1.17.0 (was: 1.14.0) (was: 1.16.0) > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Xie Yi >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, > image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, > image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-29816: --- Fix Version/s: 1.17.0 > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.3 >Reporter: Xie Yi >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, > image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, > image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29816: --- Labels: pull-request-available (was: ) > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.3 >Reporter: Xie Yi >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, > image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, > image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-29816: --- Attachment: image-2023-02-22-17-26-06-200.png > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.3 >Reporter: Xie Yi >Assignee: RocMarshal >Priority: Major > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, > image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, > image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Paul updated FLINK-29816: Affects Version/s: 1.15.3 (was: 1.15.2) > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.3 >Reporter: Xie Yi >Priority: Major > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, > image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, > image-2022-11-02-11-10-25-508.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xie Yi updated FLINK-29816: --- Attachment: image-2022-11-02-10-42-21-099.png > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.2 >Reporter: Xie Yi >Priority: Major > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 , task restore from checkpoint, userfunction in > ProcessWindowFunction was called in SteamTask.restore and produce > "java.lang.NumberFormatException", However, SteamTask catch exception and > didn't handle exception because subtask is not in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xie Yi updated FLINK-29816: --- Description: h4. 1. How to repeat ProcessWindowFunction, and make some exception in process() test code {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); KafkaSource kafkaConsumer = KafkaSource.builder() .setBootstrapServers("") .setTopics("") .setGroupId("") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.earliest()) .build(); DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) .process(new ProcessWindowFunction() { @Override public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { //when process event:"abc" .It causes java.lang.NumberFormatException Integer intS = Integer.valueOf(s); collector.collect(s); } }) .name("name-process").uid("uid-process"); mapSourse.print(); env.execute(); } {code} kafka input event {code:java} >1 >1 >2 >2 >3 >3 >abc >abc > {code} h4. 2. fault phenomena when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously. However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception !image-2022-10-31-19-54-12-546.png! checkpoint 1 complete in attempt 1,before failover exception 1 {code:java} 2022-10-31 16:59:53,644 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, checkpointDuration=333 ms, finalizationTime=72 ms). {code} attempt 2 was restore from checkpoint {code:java} 2022-10-31 17:00:30,033 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. {code} h4. 3. possible reasons during attempt 2 , task restore from checkpoint, userfunction in ProcessWindowFunction was called in SteamTask.restore and produce "java.lang.NumberFormatException", However, SteamTask catch exception and didn't handle exception because subtask is not in RUNNING state. *the stack trace in attempt 2* user function was called in SteamTask.restore(subtask state is INITIALIZING) {code:java} java.lang.Thread.getStackTrace(Thread.java:1552) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xie Yi updated FLINK-29816: --- Affects Version/s: 1.14.0 > Userfunction exception in ProcessWindowFunction was called before invoke > during restore state(subtask was in INITIALIZING state), but SteamTask skip > handle Exception > - > > Key: FLINK-29816 > URL: https://issues.apache.org/jira/browse/FLINK-29816 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.14.0, 1.16.0, 1.15.2 >Reporter: Xie Yi >Priority: Major > Attachments: image-2022-10-31-19-49-52-432.png, > image-2022-10-31-19-54-12-546.png > > > h4. 1. How to repeat > ProcessWindowFunction, and make some exception in process() > test code > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.enableCheckpointing(60 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > KafkaSource kafkaConsumer = KafkaSource.builder() > .setBootstrapServers("") > .setTopics("") > .setGroupId("") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build(); > DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> > s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) > .process(new ProcessWindowFunction TimeWindow>() { > @Override > public void process(String s, > ProcessWindowFunction.Context context, > Iterable iterable, Collector collector) throws Exception { > //when process event:"abc" .It causes > java.lang.NumberFormatException > Integer intS = Integer.valueOf(s); > collector.collect(s); > } > }) > .name("name-process").uid("uid-process"); > mapSourse.print(); > env.execute(); > } > {code} > kafka input event > {code:java} > >1 > >1 > >2 > >2 > >3 > >3 > >abc > >abc > > > {code} > h4. 2. fault phenomena > when job process the event:"abc",It will cause > java.lang.NumberFormatException and failover ,Then attempt and failover > continuously. > However, it only failover 2 times(attempt 0, attempt 1) and when attempt for > third time, It work normally, and no exception > !image-2022-10-31-19-54-12-546.png! > checkpoint 1 complete in attempt 1,before failover exception 1 > {code:java} > 2022-10-31 16:59:53,644 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job > 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, > checkpointDuration=333 ms, finalizationTime=72 ms). {code} > > attempt 2 was restore from checkpoint > {code:java} > 2022-10-31 17:00:30,033 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for > 7bca78a75b089d447bb4c99efcfd6527 located at > hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. > {code} > > > h4. 3. possible reasons > during attempt 2 ,userfunction in ProcessWindowFunction was called in > SteamTask.restore, and produce java.lang.NumberFormatException,However, > SteamTask catch exception and didn't handle exception because subtask is not > in RUNNING state. > *the stack trace in attempt 2* > user function was called in SteamTask.restore(subtask state is INITIALIZING) > {code:java} > java.lang.Thread.getStackTrace(Thread.java:1552) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) > com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) >
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xie Yi updated FLINK-29816: --- Description: h4. 1. How to repeat ProcessWindowFunction, and make some exception in process() test code {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); KafkaSource kafkaConsumer = KafkaSource.builder() .setBootstrapServers("") .setTopics("") .setGroupId("") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.earliest()) .build(); DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) .process(new ProcessWindowFunction() { @Override public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { //when process event:"abc" .It causes java.lang.NumberFormatException Integer intS = Integer.valueOf(s); collector.collect(s); } }) .name("name-process").uid("uid-process"); mapSourse.print(); env.execute(); } {code} kafka input event {code:java} >1 >1 >2 >2 >3 >3 >abc >abc > {code} h4. 2. fault phenomena when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously. However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception !image-2022-10-31-19-54-12-546.png! checkpoint 1 complete in attempt 1,before failover exception 1 {code:java} 2022-10-31 16:59:53,644 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, checkpointDuration=333 ms, finalizationTime=72 ms). {code} attempt 2 was restore from checkpoint {code:java} 2022-10-31 17:00:30,033 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. {code} h4. 3. possible reasons during attempt 2 ,userfunction in ProcessWindowFunction was called in SteamTask.restore, and produce java.lang.NumberFormatException,However, SteamTask catch exception and didn't handle exception because subtask is not in RUNNING state. *the stack trace in attempt 2* user function was called in SteamTask.restore(subtask state is INITIALIZING) {code:java} java.lang.Thread.getStackTrace(Thread.java:1552) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
[ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xie Yi updated FLINK-29816: --- Description: h4. 1. How to repeat ProcessWindowFunction, and make some exception in process() test code {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); KafkaSource kafkaConsumer = KafkaSource.builder() .setBootstrapServers("") .setTopics("") .setGroupId("") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.earliest()) .build(); DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) .process(new ProcessWindowFunction() { @Override public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { //when process event:"abc" .It causes java.lang.NumberFormatException Integer intS = Integer.valueOf(s); collector.collect(s); } }) .name("name-process").uid("uid-process"); mapSourse.print(); env.execute(); } {code} kafka input event {code:java} >1 >1 >2 >2 >3 >3 >abc >abc > {code} h4. 2. fault phenomena when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously. However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception !image-2022-10-31-19-54-12-546.png! h4. 3. possible reasons attempt 2 was restore from checkpoint {code:java} 2022-10-31 17:00:30,033 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. {code} *the stack trace in third attempt* user function was called in SteamTask.restore(subtask state is INITIALIZING) {code:java} java.lang.Thread.getStackTrace(Thread.java:1552) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690) org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) java.lang.Thread.run(Thread.java:745) {code} stack trace(which cause failover) in attempt 0 and attempt 1 user function