Hello Jan, My flink cluster is running on a kubernetes single node (rke). I have the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The TaskManger reaches the max JVM Heap Size after about one hour then fails. Here is a snippet from the TaskManager log:
2021-05-27 15:36:36,040 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-05-27 15:36:36,041 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55. 2021-05-27 15:36:36,042 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. 2021-05-27 15:36:36,042 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55. 2021-05-27 15:36:36,042 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55). 2021-05-27 15:36:36,042 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring. 2021-05-27 15:36:36,042 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. 2021-05-27 15:36:36,043 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 85433366f8bf1c5efd3b88f634676764 for job c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 00000000000000000000000000000000. 2021-05-27 15:36:36,043 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 85433366f8bf1c5efd3b88f634676764. 2021-05-27 15:36:36,043 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring. 2021-05-27 15:36:36,043 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000. 2021-05-27 15:36:36,044 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-05-27 15:36:36,045 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55. I guess the simple resolution is to increase the JVM Heap Size? On Thu, May 27, 2021 at 10:51 AM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi Robert, > > do you have some additional info? For example the last log message of the > unreachable TaskManagers. Is the Job running in kubernetes? What backend > are you using? > > From the first looks of it, I have seen this behaviour mostly in cases > where one or more taskmanagers shut down due to GarbageCollection issues or > OutOfMemory-Errors. > > > Best regards > > Jan > On 27.05.21 16:44, Robert Cullen wrote: > > I have a job that fails after @1 hour due to a TaskManager Timeout. How > can I prevent this from happening? > > 2021-05-27 10:24:21 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager > with id 10.42.0.49:6122-e26293 timed out. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) > ... 27 more > > -- > Robert Cullen > 240-475-4490 > > -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99https://www.neuland-bfi.de > https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > > -- Robert Cullen 240-475-4490