Matthias,  I increased the JVM Heap size as Jan suggested and it appears to
be a memory leak in the user code (although I'm not sure why since this is
a simple job that uses a loop to simulate data being written to an S3 data
store).  Yes, the logs show no apparent problem but the timestamp
corresponds to the job failure.  Forgive me but I don't know how to analyze
a heap dump.

On Fri, May 28, 2021 at 8:27 AM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Robert,
> increasing heap memory usage could be due to some memory leak in the user
> code. Have you analyzed a heap dump? About the TM logs you shared. I don't
> see anything suspicious there. Nothing about memory problems. Are those the
> correct logs?
>
> Best,
> Matthias
>
> On Thu, May 27, 2021 at 6:01 PM Jan Brusch <jan.bru...@neuland-bfi.de>
> wrote:
>
>> Hi Robert,
>>
>> that sounds like a case of either your application state ultimately being
>> bigger than the available RAM or a memory leak in your application (e.g.,
>> some states are not properly cleaned out after they are not needed anymore).
>>
>> If you have the available resources you could try and increase the
>> TaskManager RAM size by a large amount and see where RAM usage tops out. If
>> it ever does... in case of a memory leak it would grow indefinitely. Then
>> you could reason about how to fix the memory leak or how to achieve your
>> goal with a smaller application state.
>>
>> A remedy for application states larger than your available RAM is to use
>> the RocksDB State backend, which allows for states larger than your
>> application RAM. But that requires your kubernetes nodes to be equipped
>> with a fast hard drive (SSD, optimally).
>>
>> That's how I would approach your problem...
>>
>>
>> Hope that helps
>>
>> Jan
>> On 27.05.21 17:51, Robert Cullen wrote:
>>
>> Hello Jan,
>>
>> My flink cluster is running on a kubernetes single node (rke). I have the
>> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
>> TaskManger reaches the max JVM Heap Size after about one hour then fails.
>> Here is a snippet from the TaskManager log:
>>
>> 2021-05-27 15:36:36,040 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,041 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
>> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>> TaskSlot(index:2, state:ALLOCATED, resource profile: 
>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb 
>> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
>> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
>> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove 
>> job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive 
>> slot request 85433366f8bf1c5efd3b88f634676764 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
>> 00000000000000000000000000000000.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
>> slot for 85433366f8bf1c5efd3b88f634676764.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
>> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
>> register at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
>> 00000000-0000-0000-0000-000000000000.
>> 2021-05-27 15:36:36,044 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,045 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>>
>> I guess the simple resolution is to increase the JVM Heap Size?
>>
>> On Thu, May 27, 2021 at 10:51 AM Jan Brusch <jan.bru...@neuland-bfi.de>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> do you have some additional info? For example the last log message of
>>> the unreachable TaskManagers. Is the Job running in kubernetes? What
>>> backend are you using?
>>>
>>> From the first looks of it, I have seen this behaviour mostly in cases
>>> where one or more taskmanagers shut down due to GarbageCollection issues or
>>> OutOfMemory-Errors.
>>>
>>>
>>> Best regards
>>>
>>> Jan
>>> On 27.05.21 16:44, Robert Cullen wrote:
>>>
>>> I have a job that fails after @1 hour due to a TaskManager Timeout. How
>>> can I prevent this from happening?
>>>
>>> 2021-05-27 10:24:21
>>> org.apache.flink.runtime.JobException: Recovery is suppressed by 
>>> NoRestartBackoffTimeStrategy
>>>     at 
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>>>     at 
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>>>     at 
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
>>>     at 
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
>>>     at 
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
>>>     at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
>>>     at 
>>> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>>>     at 
>>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
>>>     at 
>>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
>>>     at 
>>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
>>>     at 
>>> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>>>     at 
>>> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>>>     at 
>>> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>>>     at 
>>> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
>>>     at 
>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
>>>     at 
>>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>>>     at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>>>     at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>     at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at 
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at 
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager 
>>> with id 10.42.0.49:6122-e26293 timed out.
>>>     at 
>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
>>>     ... 27 more
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>> --
>>> neuland  – Büro für Informatik GmbH
>>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>>
>>> Telefon (0421) 380107 57
>>> Fax (0421) 380107 99https://www.neuland-bfi.de
>>> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>>>
>>>
>>> Geschäftsführer: Thomas Gebauer, Jan Zander
>>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>>> USt-ID. DE 246585501
>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>> --
>> neuland  – Büro für Informatik GmbH
>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>> Telefon (0421) 380107 57
>> Fax (0421) 380107 99https://www.neuland-bfi.de
>> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>>
>>
>> Geschäftsführer: Thomas Gebauer, Jan Zander
>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>> USt-ID. DE 246585501
>>
>>

-- 
Robert Cullen
240-475-4490

Reply via email to