[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-8638:
---------------------------
    Description: 
The following example comes from the one snapshotState process by using hdfs, 
snapshotState failed due to hdfs disk problems, so that 
triggerCheckpointOnBarrier fails and throws an exception to make the 
application restart. However, when restarting, flink needs to recover from the 
recent completed checkpoint and start chasing the data, which can lead to 
significant delays. We think that when StreamTask's triggerCheckpointOnBarrier 
(including the triggerCheckpoint at source) fails, the application should not 
restart but instead continue running and mark the checkpoint failed. Finally, 
notify the JobManager this checkpoint
 failed. By adding Checkpoint failure alarm let developers or users know this 
situation, and take the appropriate action. During this time, the flink job 
always keeps running.

 
{code:java}
java.lang.Exception: Could not perform checkpoint 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(60000), 
ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(60000), 
ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
... 8 more

Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
... 13 more

Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad. 
Aborting...
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
 

  was:
The following example comes from the one snapshotState process by using hdfs, 
snapshotState failed due to hdfs disk problems, so that 
triggerCheckpointOnBarrier fails and throws an exception to make the 
application restart. However, when restarting, flink needs to recover from the 
recent completed checkpoint and start chasing the data, which can lead to 
significant delays. We think that when StreamTask's triggerCheckpointOnBarrier 
(including the triggerCheckpoint at source) fails, the application should not 
restart but instead continue running and mark the checkpoint failed. Finally, 
notify the JobManager this checkpoint
failed. By adding Checkpoint failure alarm let developers or users know this 
situation, and take the appropriate action. During this time, the flink job 
always keeps running.

 
{code:java}
java.lang.Exception: Could not perform checkpoint 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(60000), 
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator 
TriggerWindow(TumblingEventTimeWindows(60000), 
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
 
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
 EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
(153/459).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
... 8 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c750d7b7b3e3b498bc396570f/chk-45843/5905b7a2-fc8c-4500-898a-7b87fa5470ee
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
at 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
at 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
... 13 more
Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad. 
Aborting...
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
 


> Job restart when Checkpoint On Barrier failed
> ---------------------------------------------
>
>                 Key: FLINK-8638
>                 URL: https://issues.apache.org/jira/browse/FLINK-8638
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Ran Tao
>            Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(60000), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(60000), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are 
> bad. Aborting...
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to