[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784752#comment-17784752 ] Darcy Lin commented on FLINK-33484: --- [~martijnvisser] The issue remains the same with flink-connector-kafka:3.0.1-1.17. I checked the implementation of {{{}emitRecord{}}}, and it is still the same. > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784526#comment-17784526 ] Darcy Lin commented on FLINK-33484: --- [~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1 > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
Darcy Lin created FLINK-33484: - Summary: Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level Key: FLINK-33484 URL: https://issues.apache.org/jira/browse/FLINK-33484 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Environment: Flink 1.17.1 kafka 2.5.1 Reporter: Darcy Lin We have encountered an issue with the Flink Kafka connector when consuming transactional data from Kafka with the {{isolation.level}} set to {{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}). The problem is that even when all the data from a topic is consumed, the offset lag is not 0, but 1. However, when using the Kafka Java client to consume the same data, this issue does not occur. We suspect that this issue arises due to the way Flink Kafka connector calculates the offset. The problem seems to be in the {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. When saving the offset, the method calls {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this statement works correctly in a regular Kafka scenario, it might not be accurate when the {{read_committed}} mode is used. We believe that it should be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as transactional data in Kafka occupies an additional offset to store the transaction marker. We request the Flink team to investigate this issue and provide us with guidance on how to resolve it. Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33239) After enabling exactly-once in the Flink Kafka sink, the Kafka broker's memory keeps increasing, eventually causing the Kafka broker to crash.
Darcy Lin created FLINK-33239: - Summary: After enabling exactly-once in the Flink Kafka sink, the Kafka broker's memory keeps increasing, eventually causing the Kafka broker to crash. Key: FLINK-33239 URL: https://issues.apache.org/jira/browse/FLINK-33239 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Environment: flink 1.17.1 kafka server 2.8.2 Reporter: Darcy Lin Attachments: image-2023-10-11-18-47-32-712.png We are using Flink version 1.17.1 and Kafka server version 2.8.2. After enabling exactly-once, in order to allow downstream consumers to read data from Kafka as soon as possible, we set the checkpoint interval to 5 seconds. Approximately three days after writing to the Kafka cluster, the Kafka JVM's memory is exhausted. We printed the memory consumption and found that the main consumption is on the {{kafka.log.ProducerStateEntry}} object. Currently, in the exactly-once Kafka sink, a new producer is created every time a checkpoint is executed. The {{kafka.log.ProducerStateEntry}} object seems to store the producer's state, so it keeps increasing. We'd like to ask: Is this normal? If it's normal, do we need to allocate a large amount of memory for our Kafka cluster? If it's not normal, how should we solve this problem? !image-2023-10-11-18-47-32-712.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703004#comment-17703004 ] Darcy Lin commented on FLINK-31259: --- [~fsk119] I generally agree with your conclusions. I see that in the latest version, sql client already supports connecting to sql gateway (./bin/sql-client.sh gateway --endpoint ), which may cause the problem of point 1 you mentioned. > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701476#comment-17701476 ] Darcy Lin commented on FLINK-31259: --- [~jark] [FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management] may indeed be able to solve this problem, but I feel that it is more biased towards catalog storage problems. If sql-gateway can support a " -i, --init " parameter similar to sql-client or support a similar "sql-gateway-defaults.yaml" configuration file similar to [ververica-flink-sql-gateway|https://github.com/ververica/flink-sql-gateway/blob/master/conf] can solve this problem very well. Because it is not just catalogs, it may also involve some udf initialization and so on. > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701175#comment-17701175 ] Darcy Lin commented on FLINK-31259: --- [~jark]We also have the same usage scenario, sql-gateway needs to be able to add catalog after startup, instead of adding catalog for each user who uses sql-gateway. Just like Trino can define a catalog in its configuration file, sometimes the catalog contains a lot of sensitive information. Even if there is no sensitive information, it is very cumbersome for users to add it by themselves. The same logic applies to vvp, as long as the catalog is created on the vvp platform, all users can use it. > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Shammon FY >Assignee: Shammon FY >Priority: Major > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27332) org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive completion notification for savepoint
Darcy Lin created FLINK-27332: - Summary: org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive completion notification for savepoint Key: FLINK-27332 URL: https://issues.apache.org/jira/browse/FLINK-27332 Project: Flink Issue Type: Bug Components: API / DataStream, Connectors / FileSystem Affects Versions: 1.15.0 Reporter: Darcy Lin 2022-04-20 17:28:03,525 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=225 (max part counter=15). 2022-04-20 17:38:06,933 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=226 (max part counter=15). 2022-04-20 17:38:08,228 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=226. As shown in the above log, checkpoint 225 is triggered by savepoint, there is no log about "checkpoint completed", and checkpoint 226 is a normal checkpoint and everything works fine. The impact is that savepoint cannot modify pending files into finished. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-21839) SinkFunction snapshotState don't snapshot all data when trigger a stop-with-drain savepoint
[ https://issues.apache.org/jira/browse/FLINK-21839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17320003#comment-17320003 ] Darcy Lin commented on FLINK-21839: --- Hi [~ym], I think this problem has nothing to do with source. My test script "TestSink.java" can reproduce this problem. I think there may be records inserted between the max_watermark and END_OF_PARTITION. > SinkFunction snapshotState don't snapshot all data when trigger a > stop-with-drain savepoint > --- > > Key: FLINK-21839 > URL: https://issues.apache.org/jira/browse/FLINK-21839 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.2 >Reporter: Darcy Lin >Assignee: Yuan Mei >Priority: Critical > Attachments: TestSink.java > > > This problem was discovered when I was developing the flink code. In my flink > code, my custom sink don't send all data that be produced by event_time > window when trigger stop-with-drain savepoint . > TestSink.java is a example that SinkFunction invoke() continues to run after > snapshotState() executed when trigger a stop-with-drain savepoint by rest api. > {code:java} > //TaskSink.java log > sink open > invoke: 0 > invoke: 1 > invoke: 2 > invoke: 3 > invoke: 4 > invoke: 5 > invoke: 6 > invoke: 7 > invoke: 8 > invoke: 9 > ... > invoke: 425 > invoke: 426 > invoke: 427 > snapshotState > invoke: 428 // It should be executed before snapshotState. > sink close{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21839) SinkFunction snapshotState don't snapshot all data when trigger a stop-with-drain savepoint
Darcy Lin created FLINK-21839: - Summary: SinkFunction snapshotState don't snapshot all data when trigger a stop-with-drain savepoint Key: FLINK-21839 URL: https://issues.apache.org/jira/browse/FLINK-21839 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.12.2 Reporter: Darcy Lin Attachments: TestSink.java This problem was discovered when I was developing the flink code. In my flink code, my custom sink don't send all data that be produced by event_time window when trigger stop-with-drain savepoint . TestSink.java is a example that SinkFunction invoke() continues to run after snapshotState() executed when trigger a stop-with-drain savepoint by rest api. {code:java} //TaskSink.java log sink open invoke: 0 invoke: 1 invoke: 2 invoke: 3 invoke: 4 invoke: 5 invoke: 6 invoke: 7 invoke: 8 invoke: 9 ... invoke: 425 invoke: 426 invoke: 427 snapshotState invoke: 428 // It should be executed before snapshotState. sink close{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15406) RocksDB savepoints with heap timers cannot be restored by non-process functions
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008758#comment-17008758 ] Darcy Lin commented on FLINK-15406: --- [~sjwiesman] When I set state.backend.rocksdb.timer-service.factory: "ROCKSDB" in conf/flink-conf.yaml, the following exception is throwed by flink. {code:java} The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 6dbd0dff30710e0fea0394e71457d594) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.lilith.stream.attr.AttrStream.buildStream(AttrStream.java:56) at org.lilith.stream.attr.AttrStream.main(AttrStream.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., (JobManagerRunner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType."ROCKSDB" at java.lang.Enum.valueOf(Enum.java:238) at
[jira] [Comment Edited] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006361#comment-17006361 ] Darcy Lin edited comment on FLINK-15406 at 1/1/20 10:12 AM: Hi, [~klion26] [^CountWord.java] This is the demo I written. {code:java} flink run CountWord.jar --init {code} if you run above command, you can generate a savepoint located "file:///tmp/flink/savepoint".Then you need run following commands to reprocude this problem. {code:java} nc -lk 12345 // listen in port 12345 flink run -s file:///tmp/flink/savepoint CountWord.jar --stream // will checkpoint fail flink run -s file:///tmp/flink/savepoint CountWord.jar --stream1 // will checkpoint success {code} was (Author: lintingbin): Hi, [~klion26] [^CountWord.java] This is the demo I written. {code:java} flink run CountWord.jar --init {code} if you run above command, you can generate a savepoint located "file:///tmp/flink/savepoint".Then you need run following commands to reprocude this problem. {code:java} nc -lk 12345 // listen in port 12345 flink run -s file:///tmp/flink/savepoint CountWord.jar --stream // will checkpoint fail flink run -s file:///tmp/flink/savepoint CountWord.jar --stream // will checkpoint success {code} > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap > > > Key: FLINK-15406 > URL: https://issues.apache.org/jira/browse/FLINK-15406 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.9.1 >Reporter: Darcy Lin >Priority: Major > Attachments: CountWord.java > > > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap. But it can be retored by KeyedProcessFunction. > Following is the error message: > {code:java} > java.lang.Exception: Could not write timer service of Flat Map -> Map -> > Sink: device_first_user_create (1/8) to checkpoint state > stream.java.lang.Exception: Could not write timer service of Flat Map -> Map > -> Sink: device_first_user_create (1/8) to checkpoint state stream. at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NullPointerException at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at > org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) > at >
[jira] [Commented] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006361#comment-17006361 ] Darcy Lin commented on FLINK-15406: --- Hi, [~klion26] [^CountWord.java] This is the demo I written. {code:java} flink run CountWord.jar --init {code} if you run above command, you can generate a savepoint located "file:///tmp/flink/savepoint".Then you need run following commands to reprocude this problem. {code:java} nc -lk 12345 // listen in port 12345 flink run -s file:///tmp/flink/savepoint CountWord.jar --stream // will checkpoint fail flink run -s file:///tmp/flink/savepoint CountWord.jar --stream // will checkpoint success {code} > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap > > > Key: FLINK-15406 > URL: https://issues.apache.org/jira/browse/FLINK-15406 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.9.1 >Reporter: Darcy Lin >Priority: Major > Attachments: CountWord.java > > > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap. But it can be retored by KeyedProcessFunction. > Following is the error message: > {code:java} > java.lang.Exception: Could not write timer service of Flat Map -> Map -> > Sink: device_first_user_create (1/8) to checkpoint state > stream.java.lang.Exception: Could not write timer service of Flat Map -> Map > -> Sink: device_first_user_create (1/8) to checkpoint state stream. at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NullPointerException at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at > org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) > ... 19 more{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Lin updated FLINK-15406: -- Attachment: CountWord.java > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap > > > Key: FLINK-15406 > URL: https://issues.apache.org/jira/browse/FLINK-15406 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.9.1 >Reporter: Darcy Lin >Priority: Major > Attachments: CountWord.java > > > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap. But it can be retored by KeyedProcessFunction. > Following is the error message: > {code:java} > java.lang.Exception: Could not write timer service of Flat Map -> Map -> > Sink: device_first_user_create (1/8) to checkpoint state > stream.java.lang.Exception: Could not write timer service of Flat Map -> Map > -> Sink: device_first_user_create (1/8) to checkpoint state stream. at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NullPointerException at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at > org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) > ... 19 more{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Lin updated FLINK-15406: -- Description: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: {code:java} java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 19 more{code} was: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at
[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Lin updated FLINK-15406: -- Description: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 19 more was: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: {code:java} java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at
[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Lin updated FLINK-15406: -- Description: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: {code:java} java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 19 more{code} was: The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at
[jira] [Created] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
Darcy Lin created FLINK-15406: - Summary: The savepoint is writted by "State Processor API" can't be restore by map or flatmap Key: FLINK-15406 URL: https://issues.apache.org/jira/browse/FLINK-15406 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.9.1 Reporter: Darcy Lin The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 19 more -- This message was sent by Atlassian Jira (v8.3.4#803005)