Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-05-29 Thread Kai Fu
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering
because the downstream Kafka sink is full for a while and the job is failed
and keeps trying to recover(The downstream is full for about 4 hours). The
job cannot recover from checkpoint successfully even if after we scaled up
the Kafka cluster and shows the following exception. Is there any guidance
on how to locate and avoid this kind of issue?



































*2021-05-30 01:31:21,419 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Connecting to ResourceManager
akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Resolved ResourceManager address, beginning registration2021-05-30
01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Fatal error occurred in TaskExecutor
akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
The TaskExecutor's registration at the ResourceManager
akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
has been rejected: Rejected TaskExecutor registration at the ResourceManger
because: The ResourceManager does not recognize this TaskExecutor.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_272]at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_272]at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_272]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 01:31:21,42

Re: Idle source configuration per topic with the Kafka Table API connector

2021-05-29 Thread JING ZHANG
Hi Svend,
Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because
those version provides many related improvements.

> as per [1]
Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a
global parameter, It would apply to all those table sources which with
watermark  clause but not use SOURCE WATERMARK
> as per [2]
Yes.
> If that is correct, I guess I can simply use the DataStream connector for
that specific topic and then convert it to a Table.
Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table,
like the following demo:

Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
. // other logical
.watermark("columnName", "SOURCE_WATERMARK()")

.build());

I would like to invite Jark And Timo to double check, they are more
familiar with the issue.

Best,
JING ZHANG


Svend  于2021年5月29日周六 下午3:34写道:

> Hi everyone,
>
> My Flink streaming application consumes several Kafka topics, one of which
> receiving traffic in burst once per day.
>
> I would like that topic not to hold back the progress of the watermark.
>
> Most of my code is currently using the SQL API and in particular the Table
> API Kafka connector.
>
> I have read about the idle source configuration mechanism, could you
> please confirm my understanding that:
>
> * as per [1]: when I'm using the Table API Kafka connector, we currently
> do not have the possibility to specify the idle source parameter
> specifically for each topic, although we can set it globally on
> the StreamTableEnvironment with the "table.exec.source.idle-timeout"
> parameter
>
> * as per [2]: when using the DataStream Kafka connector, we can set the
> idle source parameter specifically for each topic by specifying
> ".withIdleness()" to the WatermarkStrategy.
>
> If that is correct, I guess I can simply use the DataStream connector for
> that specific topic and then convert it to a Table.
>
> Thanks a lot!
>
> Svend
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
>
>
>
>


Re: Flink in k8s operators list

2021-05-29 Thread Svend
Hi Ilya,

At my company we're currently using the GCP k8s operator (2nd on your list). 
Our usage is very moderate, but so far it works great for us.

We appreciate that when upgrading the application, it triggers automatically a 
savepoint during shutdown and resumes from it when restarting. It also allows 
to take savepoints at regular intervals (we take one per day currently).

We're using it with Flink 1.12.4 and AWS EKS.

Getting the Flink metrics and logs exported to our monitoring system worked out 
of the box. 

Configuring IAM roles and K8s service account for saving checkpoints and 
savepoints to S3 required a bit more fiddling although we got it working. 

Happy to share code snippet about any of that if that's useful :)

It was last updated with Flink 1.11 in mind, so there is currently no built-in 
support for the reactive scaling mode recently added in Flink 1.13.

One worrying point though is that the maintainers of the repo seem to have 
become silent in March this year. There is a small and active community around 
it though and issues and PRs keep on arriving and are waiting for feed-back. 
It's all free and OSS, so who are we to complain? Though it's still an 
important attention point.

Hope this helps,

Svend





On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
> Hi there,
> 
> I’m making a little research about the easiest way to deploy link job to k8s 
> cluster and manage its lifecycle by *k8s operator*. The list of solutions is 
> below:
> - https://github.com/fintechstudios/ververica-platform-k8s-operator
> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> - https://kudo.dev/docs/examples/apache-flink.html
> - https://github.com/wangyang0918/flink-native-k8s-operator
> 
> If you are using smth that is not listed above please share! Any share about 
> how specific solution works is greatly appreciated.
> 
> Thanks in advance


Idle source configuration per topic with the Kafka Table API connector

2021-05-29 Thread Svend
Hi everyone,

My Flink streaming application consumes several Kafka topics, one of which 
receiving traffic in burst once per day.

I would like that topic not to hold back the progress of the watermark.

Most of my code is currently using the SQL API and in particular the Table API 
Kafka connector.

I have read about the idle source configuration mechanism, could you please 
confirm my understanding that:

* as per [1]: when I'm using the Table API Kafka connector, we currently do not 
have the possibility to specify the idle source parameter specifically for each 
topic, although we can set it globally on the StreamTableEnvironment with the 
"table.exec.source.idle-timeout" parameter

* as per [2]: when using the DataStream Kafka connector, we can set the idle 
source parameter specifically for each topic by specifying ".withIdleness()" to 
the WatermarkStrategy.

If that is correct, I guess I can simply use the DataStream connector for that 
specific topic and then convert it to a Table.

Thanks a lot!

Svend



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission