Hi:
谢谢回复,我明白了。还有一个问题: 日志里显示:Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=30000) 这说明我的任务已经重启了2次依然失败了吗? 日志里我看不到重启的记录,这个应该怎么确认呢? 这里的报错是第一次重启还是最后一次重启导致的呢? 谢谢! 在 2021-12-22 10:47:24,"Caizhi Weng" <[email protected]> 写道: >Hi! > >log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job >fail 了两次。 > >宋品如 <[email protected]> 于2021年12月22日周三 10:14写道: > >> 发件人: Song PinRu >> 发送时间: 2021年12月21日 15:19 >> 收件人: [email protected] >> 主题: flink固定延迟重启策略没有延迟 >> >> Hi: >> 昨天的邮件截图看不了,把日志贴上来重新发送一份 >> ------------------------------------------------------ >> >> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次, >> >> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败, >> >> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。 >> >> 有没有人能告诉我这是为什么? >> >> >> >> 设置重启策略的代码: >> ``` >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val backend = new >> FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT) >> env.setStateBackend(backend) >> // 每 30000ms 开始一次 checkpoint >> env.enableCheckpointing(30000) >> // 设置模式为精确一次 (这是默认值) >> >> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >> // 确认 checkpoints 之间的时间会进行 500 ms >> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000) >> // Checkpoint 必须在2分钟内完成,否则就会被抛弃 >> env.getCheckpointConfig.setCheckpointTimeout(120000) >> // 可容忍checkpoint失败次数 >> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) >> // 同一时间只允许一个 checkpoint 进行 >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> //设置全局并行度 >> // env.setParallelism(3) >> //重启策略 >> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用 >> >> >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, >> Time.seconds(30))) >> ``` >> >> >> >> >> >> >> 日志: >> ``` >> >> 2021-12-21 06:26:50,850 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a) >> switched from RUNNING to FAILED on >> container_e1595_1638345947522_0010_01_000003 @ pbj-cdh-20-72.optaim.com >> (dataPort=35530). >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka: This server is not the leader for that topic-partition. >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: >> This server is not the leader for that topic-partition. >> >> 2021-12-21 06:26:50,855 INFO >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >> [] - Calculating tasks to restart to recover the failed task >> cbc357ccb763df2852fee8c4fc7d55f2_1. >> >> 2021-12-21 06:26:50,855 INFO >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >> [] - 1 tasks should be restarted to recover the failed task >> cbc357ccb763df2852fee8c4fc7d55f2_1. >> >> 2021-12-21 06:26:50,855 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from >> state RUNNING to RESTARTING. >> >> 2021-12-21 06:26:50,905 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> source -> note -> Sink: sink (3/3) (ac66ba9cfafe75f87709073e000cf2e8) >> switched from RUNNING to FAILED on >> container_e1595_1638345947522_0010_01_000004 @ pbj-cdh-20-72.optaim.com >> (dataPort=21770). >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka: This server is not the leader for that topic-partition. >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: >> This server is not the leader for that topic-partition. >> >> 2021-12-21 06:26:50,906 INFO >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >> [] - Calculating tasks to restart to recover the failed task >> cbc357ccb763df2852fee8c4fc7d55f2_2. >> >> 2021-12-21 06:26:50,906 INFO >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >> [] - 1 tasks should be restarted to recover the failed task >> cbc357ccb763df2852fee8c4fc7d55f2_2. >> >> 2021-12-21 06:26:50,907 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from >> state RESTARTING to FAILING. >> >> org.apache.flink.runtime.JobException: Recovery is suppressed by >> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, >> backoffTimeMS=30000) >> >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_181] >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_181] >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_181] >> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka: This server is not the leader for that topic-partition. >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: >> This server is not the leader for that topic-partition. >> >> 2021-12-21 06:26:50,915 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> source -> note -> Sink: sink (1/3) (bb18842478f275632da311176b24d5e2) >> switched from RUNNING to CANCELING. >> >> 2021-12-21 06:26:50,975 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> source -> note -> Sink: sink (1/3) (bb18842478f275632da311176b24d5e2) >> switched from CANCELING to CANCELED. >> >> 2021-12-21 06:26:50,976 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> NoteInfoSingleStream$ (2c523579f1ed10037d3590aca9a0b957) switched from >> state FAILING to FAILED. >> >> org.apache.flink.runtime.JobException: Recovery is suppressed by >> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, >> backoffTimeMS=30000) >> >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_181] >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_181] >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_181] >> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> [flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to >> send data to Kafka: This server is not the leader for that topic-partition. >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >> ~[dws_module-1.0.4.jar:?] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> >> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: >> This server is not the leader for that topic-partition. >> >> ``` >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> 祝工作顺利,生活愉快! >> 发件人:宋品如 >> 岗位:大数据开发 >> 时间:2021-12-22
