[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-10 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784752#comment-17784752
 ] 

Darcy Lin commented on FLINK-33484:
---

[~martijnvisser] The issue remains the same with 
flink-connector-kafka:3.0.1-1.17. I checked the implementation of 
{{{}emitRecord{}}}, and it is still the same.

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-09 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784526#comment-17784526
 ] 

Darcy Lin commented on FLINK-33484:
---

[~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-08 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-33484:
-

 Summary: Flink Kafka Connector Offset Lag Issue with Transactional 
Data and Read Committed Isolation Level
 Key: FLINK-33484
 URL: https://issues.apache.org/jira/browse/FLINK-33484
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
 Environment: Flink 1.17.1

kafka 2.5.1
Reporter: Darcy Lin


We have encountered an issue with the Flink Kafka connector when consuming 
transactional data from Kafka with the {{isolation.level}} set to 
{{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}). 
The problem is that even when all the data from a topic is consumed, the offset 
lag is not 0, but 1. However, when using the Kafka Java client to consume the 
same data, this issue does not occur.

We suspect that this issue arises due to the way Flink Kafka connector 
calculates the offset. The problem seems to be in the 
{{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
When saving the offset, the method calls 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
statement works correctly in a regular Kafka scenario, it might not be accurate 
when the {{read_committed}} mode is used. We believe that it should be 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
transactional data in Kafka occupies an additional offset to store the 
transaction marker.

We request the Flink team to investigate this issue and provide us with 
guidance on how to resolve it.

Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33239) After enabling exactly-once in the Flink Kafka sink, the Kafka broker's memory keeps increasing, eventually causing the Kafka broker to crash.

2023-10-11 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-33239:
-

 Summary: After enabling exactly-once in the Flink Kafka sink, the 
Kafka broker's memory keeps increasing, eventually causing the Kafka broker to 
crash.
 Key: FLINK-33239
 URL: https://issues.apache.org/jira/browse/FLINK-33239
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
 Environment: flink 1.17.1

kafka server 2.8.2
Reporter: Darcy Lin
 Attachments: image-2023-10-11-18-47-32-712.png

We are using Flink version 1.17.1 and Kafka server version 2.8.2. After 
enabling exactly-once, in order to allow downstream consumers to read data from 
Kafka as soon as possible, we set the checkpoint interval to 5 seconds. 
Approximately three days after writing to the Kafka cluster, the Kafka JVM's 
memory is exhausted. We printed the memory consumption and found that the main 
consumption is on the {{kafka.log.ProducerStateEntry}} object.

Currently, in the exactly-once Kafka sink, a new producer is created every time 
a checkpoint is executed. The {{kafka.log.ProducerStateEntry}} object seems to 
store the producer's state, so it keeps increasing. We'd like to ask: Is this 
normal? If it's normal, do we need to allocate a large amount of memory for our 
Kafka cluster? If it's not normal, how should we solve this problem?

!image-2023-10-11-18-47-32-712.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-20 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703004#comment-17703004
 ] 

Darcy Lin commented on FLINK-31259:
---

[~fsk119] I generally agree with your conclusions. I see that in the latest 
version, sql client already supports connecting to sql gateway 
(./bin/sql-client.sh gateway --endpoint ), which may cause the 
problem of point 1 you mentioned.

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-16 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701476#comment-17701476
 ] 

Darcy Lin commented on FLINK-31259:
---

[~jark] 
[FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management]
 may indeed be able to solve this problem, but I feel that it is more biased 
towards catalog storage problems. If sql-gateway can support a " -i, --init 
" parameter similar to sql-client or support a similar 
"sql-gateway-defaults.yaml" configuration file similar to 
[ververica-flink-sql-gateway|https://github.com/ververica/flink-sql-gateway/blob/master/conf]
 can solve this problem very well. Because it is not just catalogs, it may also 
involve some udf initialization and so on.

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-16 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701175#comment-17701175
 ] 

Darcy Lin commented on FLINK-31259:
---

[~jark]We also have the same usage scenario, sql-gateway needs to be able to 
add catalog after startup, instead of adding catalog for each user who uses 
sql-gateway. Just like Trino can define a catalog in its configuration file, 
sometimes the catalog contains a lot of sensitive information. Even if there is 
no sensitive information, it is very cumbersome for users to add it by 
themselves. The same logic applies to vvp, as long as the catalog is created on 
the vvp platform, all users can use it.

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Shammon FY
>Assignee: Shammon FY
>Priority: Major
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27332) org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive completion notification for savepoint

2022-04-20 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-27332:
-

 Summary: 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive 
completion notification for savepoint
 Key: FLINK-27332
 URL: https://issues.apache.org/jira/browse/FLINK-27332
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Darcy Lin


2022-04-20 17:28:03,525 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=225 (max part counter=15).
 
2022-04-20 17:38:06,933 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=226 (max part counter=15).
2022-04-20 17:38:08,228 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=226.
 

As shown in the above log, checkpoint 225 is triggered by savepoint, there is 
no log about "checkpoint completed", and checkpoint 226 is a normal checkpoint 
and everything works fine. The impact is that savepoint cannot modify pending 
files into finished.
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-21839) SinkFunction snapshotState don't snapshot all data when trigger a stop-with-drain savepoint

2021-04-13 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17320003#comment-17320003
 ] 

Darcy Lin commented on FLINK-21839:
---

Hi [~ym], I think this problem has nothing to do with source. My test script 
"TestSink.java" can reproduce this problem. I think there may be records 
inserted between the max_watermark and END_OF_PARTITION.

> SinkFunction snapshotState don't snapshot all data when trigger a 
> stop-with-drain savepoint
> ---
>
> Key: FLINK-21839
> URL: https://issues.apache.org/jira/browse/FLINK-21839
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.2
>Reporter: Darcy Lin
>Assignee: Yuan Mei
>Priority: Critical
> Attachments: TestSink.java
>
>
> This problem was discovered when I was developing the flink code. In my flink 
> code, my custom sink don't send all data that be produced by event_time 
> window when trigger stop-with-drain savepoint .
> TestSink.java is a example that SinkFunction invoke() continues to run after 
> snapshotState() executed when trigger a stop-with-drain savepoint by rest api.
> {code:java}
> //TaskSink.java log
> sink open
> invoke: 0
> invoke: 1
> invoke: 2
> invoke: 3
> invoke: 4
> invoke: 5
> invoke: 6
> invoke: 7
> invoke: 8
> invoke: 9
> ...
> invoke: 425
> invoke: 426
> invoke: 427
> snapshotState
> invoke: 428 // It should be executed before snapshotState.
> sink close{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21839) SinkFunction snapshotState don't snapshot all data when trigger a stop-with-drain savepoint

2021-03-16 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-21839:
-

 Summary: SinkFunction snapshotState don't snapshot all data when 
trigger a stop-with-drain savepoint
 Key: FLINK-21839
 URL: https://issues.apache.org/jira/browse/FLINK-21839
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.2
Reporter: Darcy Lin
 Attachments: TestSink.java

This problem was discovered when I was developing the flink code. In my flink 
code, my custom sink don't send all data that be produced by event_time window 
when trigger stop-with-drain savepoint .

TestSink.java is a example that SinkFunction invoke() continues to run after 
snapshotState() executed when trigger a stop-with-drain savepoint by rest api.
{code:java}
//TaskSink.java log
sink open
invoke: 0
invoke: 1
invoke: 2
invoke: 3
invoke: 4
invoke: 5
invoke: 6
invoke: 7
invoke: 8
invoke: 9
...
invoke: 425
invoke: 426
invoke: 427
snapshotState
invoke: 428 // It should be executed before snapshotState.
sink close{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15406) RocksDB savepoints with heap timers cannot be restored by non-process functions

2020-01-06 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008758#comment-17008758
 ] 

Darcy Lin commented on FLINK-15406:
---

[~sjwiesman] When I set state.backend.rocksdb.timer-service.factory: "ROCKSDB" 
in conf/flink-conf.yaml, the following exception is throwed by flink.
{code:java}
 The program finished with the following 
exception:org.apache.flink.client.program.ProgramInvocationException: Could not 
retrieve the execution result. (JobID: 6dbd0dff30710e0fea0394e71457d594)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.lilith.stream.attr.AttrStream.buildStream(AttrStream.java:56)
at org.lilith.stream.attr.AttrStream.main(AttrStream.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalArgumentException: No enum constant 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType."ROCKSDB"
at java.lang.Enum.valueOf(Enum.java:238)
at 

[jira] [Comment Edited] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2020-01-01 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006361#comment-17006361
 ] 

Darcy Lin edited comment on FLINK-15406 at 1/1/20 10:12 AM:


Hi, [~klion26] 

[^CountWord.java] This is the demo I written.
{code:java}
flink run CountWord.jar --init
{code}
if you run above command, you can generate a savepoint located 
"file:///tmp/flink/savepoint".Then you need run following commands to reprocude 
this problem.
{code:java}
nc -lk 12345 // listen in port 12345
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will 
checkpoint fail
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream1   // will 
checkpoint success
{code}
 


was (Author: lintingbin):
Hi, [~klion26] 

[^CountWord.java] This is the demo I written. 

{code:java}
flink run CountWord.jar --init
{code}
if you run above command, you can generate a savepoint located 
"file:///tmp/flink/savepoint".Then you need run following commands to reprocude 
this problem.

{code:java}
nc -lk 12345 // listen in port 12345
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will 
checkpoint fail
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will 
checkpoint success
{code}
 

> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap
> 
>
> Key: FLINK-15406
> URL: https://issues.apache.org/jira/browse/FLINK-15406
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.9.1
>Reporter: Darcy Lin
>Priority: Major
> Attachments: CountWord.java
>
>
> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap. But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> 
> Sink: device_first_user_create (1/8) to checkpoint state 
> stream.java.lang.Exception: Could not write timer service of Flat Map -> Map 
> -> Sink: device_first_user_create (1/8) to checkpoint state stream. at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
> org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
>  at 
> 

[jira] [Commented] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2020-01-01 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006361#comment-17006361
 ] 

Darcy Lin commented on FLINK-15406:
---

Hi, [~klion26] 

[^CountWord.java] This is the demo I written. 

{code:java}
flink run CountWord.jar --init
{code}
if you run above command, you can generate a savepoint located 
"file:///tmp/flink/savepoint".Then you need run following commands to reprocude 
this problem.

{code:java}
nc -lk 12345 // listen in port 12345
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will 
checkpoint fail
flink run -s file:///tmp/flink/savepoint CountWord.jar --stream   // will 
checkpoint success
{code}
 

> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap
> 
>
> Key: FLINK-15406
> URL: https://issues.apache.org/jira/browse/FLINK-15406
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.9.1
>Reporter: Darcy Lin
>Priority: Major
> Attachments: CountWord.java
>
>
> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap. But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> 
> Sink: device_first_user_create (1/8) to checkpoint state 
> stream.java.lang.Exception: Could not write timer service of Flat Map -> Map 
> -> Sink: device_first_user_create (1/8) to checkpoint state stream. at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
> org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
>  ... 19 more{code}
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2020-01-01 Thread Darcy Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Lin updated FLINK-15406:
--
Attachment: CountWord.java

> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap
> 
>
> Key: FLINK-15406
> URL: https://issues.apache.org/jira/browse/FLINK-15406
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.9.1
>Reporter: Darcy Lin
>Priority: Major
> Attachments: CountWord.java
>
>
> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap. But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> 
> Sink: device_first_user_create (1/8) to checkpoint state 
> stream.java.lang.Exception: Could not write timer service of Flat Map -> Map 
> -> Sink: device_first_user_create (1/8) to checkpoint state stream. at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
> org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
>  ... 19 more{code}
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Darcy Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Lin updated FLINK-15406:
--
Description: 
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
 Following is the error message:
{code:java}
java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
 at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
 ... 19 more{code}

  

  was:
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
 Following is the error message:

java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 

[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Darcy Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Lin updated FLINK-15406:
--
Description: 
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
 Following is the error message:

java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
 at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
 ... 19 more
 

  was:
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
 Following is the error message:
{code:java}
java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 

[jira] [Updated] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Darcy Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Lin updated FLINK-15406:
--
Description: 
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
 Following is the error message:
{code:java}
java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
 at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
 ... 19 more{code}

  was:
The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
Following is the error message:

java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 

[jira] [Created] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-15406:
-

 Summary: The savepoint is writted by "State Processor API" can't 
be restore by map or flatmap
 Key: FLINK-15406
 URL: https://issues.apache.org/jira/browse/FLINK-15406
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.9.1
Reporter: Darcy Lin


The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
Following is the error message:

java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
 at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
 ... 19 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)