Matthias, I increased the JVM Heap size as Jan suggested and it appears to be a memory leak in the user code (although I'm not sure why since this is a simple job that uses a loop to simulate data being written to an S3 data store). Yes, the logs show no apparent problem but the timestamp corresponds to the job failure. Forgive me but I don't know how to analyze a heap dump.
On Fri, May 28, 2021 at 8:27 AM Matthias Pohl <matth...@ververica.com> wrote: > Hi Robert, > increasing heap memory usage could be due to some memory leak in the user > code. Have you analyzed a heap dump? About the TM logs you shared. I don't > see anything suspicious there. Nothing about memory problems. Are those the > correct logs? > > Best, > Matthias > > On Thu, May 27, 2021 at 6:01 PM Jan Brusch <jan.bru...@neuland-bfi.de> > wrote: > >> Hi Robert, >> >> that sounds like a case of either your application state ultimately being >> bigger than the available RAM or a memory leak in your application (e.g., >> some states are not properly cleaned out after they are not needed anymore). >> >> If you have the available resources you could try and increase the >> TaskManager RAM size by a large amount and see where RAM usage tops out. If >> it ever does... in case of a memory leak it would grow indefinitely. Then >> you could reason about how to fix the memory leak or how to achieve your >> goal with a smaller application state. >> >> A remedy for application states larger than your available RAM is to use >> the RocksDB State backend, which allows for states larger than your >> application RAM. But that requires your kubernetes nodes to be equipped >> with a fast hard drive (SSD, optimally). >> >> That's how I would approach your problem... >> >> >> Hope that helps >> >> Jan >> On 27.05.21 17:51, Robert Cullen wrote: >> >> Hello Jan, >> >> My flink cluster is running on a kubernetes single node (rke). I have the >> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The >> TaskManger reaches the max JVM Heap Size after about one hour then fails. >> Here is a snippet from the TaskManager log: >> >> 2021-05-27 15:36:36,040 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved >> JobManager address, beginning registration >> 2021-05-27 15:36:36,041 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - >> Successful registration at job manager >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job >> c5ff9686e944f62a24c10c6bf20a5a55. >> 2021-05-27 15:36:36,042 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish >> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. >> 2021-05-27 15:36:36,042 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer >> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55. >> 2021-05-27 15:36:36,042 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot >> TaskSlot(index:2, state:ALLOCATED, resource profile: >> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb >> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb >> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: >> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55). >> 2021-05-27 15:36:36,042 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove >> job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring. >> 2021-05-27 15:36:36,042 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close >> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. >> 2021-05-27 15:36:36,043 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive >> slot request 85433366f8bf1c5efd3b88f634676764 for job >> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id >> 00000000000000000000000000000000. >> 2021-05-27 15:36:36,043 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated >> slot for 85433366f8bf1c5efd3b88f634676764. >> 2021-05-27 15:36:36,043 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job >> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring. >> 2021-05-27 15:36:36,043 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to >> register at job manager >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id >> 00000000-0000-0000-0000-000000000000. >> 2021-05-27 15:36:36,044 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved >> JobManager address, beginning registration >> 2021-05-27 15:36:36,045 INFO >> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - >> Successful registration at job manager >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job >> c5ff9686e944f62a24c10c6bf20a5a55. >> >> I guess the simple resolution is to increase the JVM Heap Size? >> >> On Thu, May 27, 2021 at 10:51 AM Jan Brusch <jan.bru...@neuland-bfi.de> >> wrote: >> >>> Hi Robert, >>> >>> do you have some additional info? For example the last log message of >>> the unreachable TaskManagers. Is the Job running in kubernetes? What >>> backend are you using? >>> >>> From the first looks of it, I have seen this behaviour mostly in cases >>> where one or more taskmanagers shut down due to GarbageCollection issues or >>> OutOfMemory-Errors. >>> >>> >>> Best regards >>> >>> Jan >>> On 27.05.21 16:44, Robert Cullen wrote: >>> >>> I have a job that fails after @1 hour due to a TaskManager Timeout. How >>> can I prevent this from happening? >>> >>> 2021-05-27 10:24:21 >>> org.apache.flink.runtime.JobException: Recovery is suppressed by >>> NoRestartBackoffTimeStrategy >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) >>> at >>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) >>> at >>> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) >>> at >>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) >>> at >>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) >>> at >>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) >>> at >>> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) >>> at >>> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) >>> at >>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) >>> at >>> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) >>> at >>> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) >>> at >>> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) >>> at >>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497) >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295) >>> at >>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >>> at akka.actor.Actor.aroundReceive(Actor.scala:517) >>> at akka.actor.Actor.aroundReceive$(Actor.scala:515) >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager >>> with id 10.42.0.49:6122-e26293 timed out. >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) >>> ... 27 more >>> >>> -- >>> Robert Cullen >>> 240-475-4490 >>> >>> -- >>> neuland – Büro für Informatik GmbH >>> Konsul-Smidt-Str. 8g, 28217 Bremen >>> >>> Telefon (0421) 380107 57 >>> Fax (0421) 380107 99https://www.neuland-bfi.de >>> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi >>> >>> >>> Geschäftsführer: Thomas Gebauer, Jan Zander >>> Registergericht: Amtsgericht Bremen, HRB 23395 HB >>> USt-ID. DE 246585501 >>> >>> >> >> -- >> Robert Cullen >> 240-475-4490 >> >> -- >> neuland – Büro für Informatik GmbH >> Konsul-Smidt-Str. 8g, 28217 Bremen >> >> Telefon (0421) 380107 57 >> Fax (0421) 380107 99https://www.neuland-bfi.de >> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi >> >> >> Geschäftsführer: Thomas Gebauer, Jan Zander >> Registergericht: Amtsgericht Bremen, HRB 23395 HB >> USt-ID. DE 246585501 >> >> -- Robert Cullen 240-475-4490