Error about "Rejected TaskExecutor registration at the ResourceManger"
Hi team, We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue? *2021-05-30 01:31:21,419 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30 01:31:21,422 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved ResourceManager address, beginning registration2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Fatal error occurred in TaskExecutor akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor. at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,42
Re: Idle source configuration per topic with the Kafka Table API connector
Hi Svend, Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those version provides many related improvements. > as per [1] Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a global parameter, It would apply to all those table sources which with watermark clause but not use SOURCE WATERMARK > as per [2] Yes. > If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table. Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like the following demo: Table table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() . // other logical .watermark("columnName", "SOURCE_WATERMARK()") .build()); I would like to invite Jark And Timo to double check, they are more familiar with the issue. Best, JING ZHANG Svend 于2021年5月29日周六 下午3:34写道: > Hi everyone, > > My Flink streaming application consumes several Kafka topics, one of which > receiving traffic in burst once per day. > > I would like that topic not to hold back the progress of the watermark. > > Most of my code is currently using the SQL API and in particular the Table > API Kafka connector. > > I have read about the idle source configuration mechanism, could you > please confirm my understanding that: > > * as per [1]: when I'm using the Table API Kafka connector, we currently > do not have the possibility to specify the idle source parameter > specifically for each topic, although we can set it globally on > the StreamTableEnvironment with the "table.exec.source.idle-timeout" > parameter > > * as per [2]: when using the DataStream Kafka connector, we can set the > idle source parameter specifically for each topic by specifying > ".withIdleness()" to the WatermarkStrategy. > > If that is correct, I guess I can simply use the DataStream connector for > that specific topic and then convert it to a Table. > > Thanks a lot! > > Svend > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission > > > >
Re: Flink in k8s operators list
Hi Ilya, At my company we're currently using the GCP k8s operator (2nd on your list). Our usage is very moderate, but so far it works great for us. We appreciate that when upgrading the application, it triggers automatically a savepoint during shutdown and resumes from it when restarting. It also allows to take savepoints at regular intervals (we take one per day currently). We're using it with Flink 1.12.4 and AWS EKS. Getting the Flink metrics and logs exported to our monitoring system worked out of the box. Configuring IAM roles and K8s service account for saving checkpoints and savepoints to S3 required a bit more fiddling although we got it working. Happy to share code snippet about any of that if that's useful :) It was last updated with Flink 1.11 in mind, so there is currently no built-in support for the reactive scaling mode recently added in Flink 1.13. One worrying point though is that the maintainers of the repo seem to have become silent in March this year. There is a small and active community around it though and issues and PRs keep on arriving and are waiting for feed-back. It's all free and OSS, so who are we to complain? Though it's still an important attention point. Hope this helps, Svend On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote: > Hi there, > > I’m making a little research about the easiest way to deploy link job to k8s > cluster and manage its lifecycle by *k8s operator*. The list of solutions is > below: > - https://github.com/fintechstudios/ververica-platform-k8s-operator > - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > - https://kudo.dev/docs/examples/apache-flink.html > - https://github.com/wangyang0918/flink-native-k8s-operator > > If you are using smth that is not listed above please share! Any share about > how specific solution works is greatly appreciated. > > Thanks in advance
Idle source configuration per topic with the Kafka Table API connector
Hi everyone, My Flink streaming application consumes several Kafka topics, one of which receiving traffic in burst once per day. I would like that topic not to hold back the progress of the watermark. Most of my code is currently using the SQL API and in particular the Table API Kafka connector. I have read about the idle source configuration mechanism, could you please confirm my understanding that: * as per [1]: when I'm using the Table API Kafka connector, we currently do not have the possibility to specify the idle source parameter specifically for each topic, although we can set it globally on the StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter * as per [2]: when using the DataStream Kafka connector, we can set the idle source parameter specifically for each topic by specifying ".withIdleness()" to the WatermarkStrategy. If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table. Thanks a lot! Svend [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission