发件人: Song PinRu
发送时间: 2021年12月21日 15:19
收件人: [email protected]
主题: flink固定延迟重启策略没有延迟

 Hi:
    昨天的邮件截图看不了,把日志贴上来重新发送一份
------------------------------------------------------

查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,

但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,

我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。

有没有人能告诉我这是为什么?



 设置重启策略的代码:
```

val env = StreamExecutionEnvironment.getExecutionEnvironment
val backend = new FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
env.setStateBackend(backend)
// 每 30000ms 开始一次 checkpoint
env.enableCheckpointing(30000)
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000)
// Checkpoint 必须在2分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(120000)
// 可容忍checkpoint失败次数
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//设置全局并行度
//  env.setParallelism(3)
//重启策略
//PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用


env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(30)))
```






日志:
```

 2021-12-21 06:26:50,850 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a) switched 
from RUNNING to FAILED on container_e1595_1638345947522_0010_01_000003 @ 
pbj-cdh-20-72.optaim.com (dataPort=35530).

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: This server is not the leader for that topic-partition.

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

2021-12-21 06:26:50,855 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_1.

2021-12-21 06:26:50,855 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 1 tasks should be restarted to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_1. 

2021-12-21 06:26:50,855 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from state 
RUNNING to RESTARTING.

2021-12-21 06:26:50,905 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
source -> note -> Sink: sink (3/3) (ac66ba9cfafe75f87709073e000cf2e8) switched 
from RUNNING to FAILED on container_e1595_1638345947522_0010_01_000004 @ 
pbj-cdh-20-72.optaim.com (dataPort=21770).

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: This server is not the leader for that topic-partition.

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

2021-12-21 06:26:50,906 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_2.

2021-12-21 06:26:50,906 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 1 tasks should be restarted to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_2. 

2021-12-21 06:26:50,907 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from state 
RESTARTING to FAILING.

org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=30000)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_181]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_181]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: This server is not the leader for that 
topic-partition.

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

2021-12-21 06:26:50,915 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
source -> note -> Sink: sink (1/3) (bb18842478f275632da311176b24d5e2) switched 
from RUNNING to CANCELING.

2021-12-21 06:26:50,975 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
source -> note -> Sink: sink (1/3) (bb18842478f275632da311176b24d5e2) switched 
from CANCELING to CANCELED.

2021-12-21 06:26:50,976 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from state 
FAILING to FAILED.

org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=30000)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_181]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_181]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: This server is not the leader for that 
topic-partition.

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[dws_module-1.0.4.jar:?]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

 ```




 
















--

祝工作顺利,生活愉快!
发件人:宋品如
岗位:大数据开发
时间:2021-12-22

回复