Thanks Piotr and Stefan, The problem was the overhead in the heap memory usage of the JobManager when increasing the num-retained checkpoints. It was solved once I revert that value to one.
BR That's the actual error according to the JobManager log in the OOM: 2018-01-08 22:27:09,293 WARN org.jboss.netty.channel.socket.nio.AbstractNioSelector - Unexpected exception in the selector loop. java.lang.OutOfMemoryError: Java heap space 2018-01-08 22:27:15,796 ERROR akka.actor.ActorSystemImpl - Uncaught error from thread [flink-akka.actor.default-dispatcher-22840] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled java.lang.OutOfMemoryError: Java heap space at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. growArray(ForkJoinPool.java:1090) at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1978) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107) 2018-01-08 22:27:16,288 ERROR akka.actor.ActorSystemImpl - Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-22839] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled java.lang.OutOfMemoryError: Java heap space at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. growArray(ForkJoinPool.java:1090) at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1978) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107) 2018-01-08 22:27:16,882 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard root cache directory /tmp/flink-web-f75e187d-3d08-4864-ba08-1740c8586be1 2018-01-08 22:27:17,394 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard jar upload directory /tmp/flink-web-2c8657f2-9b87-4964-bde4-9997ef31966d 2018-01-08 22:27:19,863 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:44378 here’s an test with Abba after it has accumulated state for 21 hours it seems that creating a nightly savepoint won’t necessarily be scalable so being able to use incremental checkpoints would still seem appealing, if possible or why not making a savepoint somehow by copying the required files of a checkpoint instead - but I doubt that flink would support that *José Miguel Tejedor Fernández* Server developer jose.fernan...@rovio.com Rovio Entertainment Ltd. Keilaranta 7, FIN - 02150 Espoo, Finland www.rovio.com On Wed, Jan 10, 2018 at 3:08 PM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > This Task Manager log is suggesting that problems lays on the Job Manager > side (no visible gap in the logs, GC Time reported is accumulated and 31 > seconds accumulated over 963 gc collections is low value). Could you show > the Job Manager log itself? Probably it’s the own that’s causing the > TaskManager to timeout. > > On the other hand, I see that Task Manager max heap size is ~5GB and I > assume this is the same setting for the Job manager. A Stefan pointed out, > there is some memory overhead on the Job Manager for retaining the > checkpoint and it is around couple of hundred bytes (maybe even 1KB) per > operator instance. By doing quick math: > > 2880 checkpoints * 10 task managers * 10 operators in the job * 8 > parallelism per task manager * 500 bytes = ~1GB > > The answer might be that you just need to increase the Job Manager max > heap to retain 2880 checkpoints. > > Piotrek > > On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez < > jose.fernan...@rovio.com> wrote: > > Hi, > > I wonder what reason you might have that you ever want such a huge number >> of retained checkpoints? > > > The Flink jobs running on EMR cluster require a checkpoint at midnight. > (In our use case we need to synch a loaded delta to our a third party > partner with the streamed data). The delta load the whole day data and > that's why we wanted to have available the midnight's checkpoint to start > from there. > We could also make a savepoint at midnight, but it’s not as handy (we > would need to build our own tooling to do it), and it can’t benefit from > the smaller latency of an incremental checkpoint. Another thining is that > implementing our own savepoint tool is a bit hard to monitor. Besides, > retaining several having checkpoints created every minute is that it would > also allow us to load a delta at any time. Please, if there are better ways > of achieving this, let me know. > > From where does the log trace come from? > > > It comes from the TaskManager. > > Please search on the opposite side of the time outing connection for >> possible root cause of the timeout including: >> - possible error/exceptions/warnings >> - long GC pauses or other blocking operations (possibly long unnatural >> gaps in the logs) >> - machine health (CPU usage, disks usage, network connections) > > > It seems that TaskManager disconnect from JobManager and then cannot reach > it again and I cannot tell the reason. I think machine health metrics > mentioned above seems to be OK. Would you say *Direct memory stats *usage > is correct? What is the way to check the GC pauses? > Those are some traces from the TaskManager log, before/after it detached > from JobManager > > 2018-01-08 22:26:37,263 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, > GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36] > 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: > 116/119/-1 MB (used/committed/max)] > 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Direct memory stats: Count: 100, Total Capacity: 29942814, > Used Memory: 29942815 > 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Off-heap pool stats: [Code Cache: 42/42/240 MB > (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], > [Compressed Class Space: 8/8/1024 MB (used/committed/max)] > 2018-01-08 22:26:42,264 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, > GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36] > 2018-01-08 22:26:42,999 WARN akka.remote.RemoteWatcher > - Detected unreachable: [akka.tcp://flink@ip-10-1-51- > 209.cloud-internal.rovio.com:35341] > 2018-01-08 22:26:43,034 INFO org.apache.flink.yarn.YarnTaskManager > - TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@ip-10-1-51- > 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no > longer reachable > 2018-01-08 22:26:43,035 INFO org.apache.flink.yarn.YarnTaskManager > - Cancelling all computations and discarding all cached > data. > 2018-01-08 22:26:43,037 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Sink: Discarded > events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d). > 2018-01-08 22:26:43,037 INFO org.apache.flink.runtime.taskmanager.Task > - Sink: Discarded events (4/4) ( > 50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@ip-10-1-51- > 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no > longer reachable > at org.apache.flink.runtime.taskmanager.TaskManager. > handleJobManagerDisconnect(TaskManager.scala:1095) > at org.apache.flink.runtime.taskmanager.TaskManager$$ > anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > AbstractPartialFunction.scala:33) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:33) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LeaderSessionMessageFilter$$ > anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > AbstractPartialFunction.scala:33) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:33) > at scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at org.apache.flink.runtime.LogMessages$$anon$1. > applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at org.apache.flink.runtime.taskmanager.TaskManager. > aroundReceive(TaskManager.scala:120) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.dungeon.DeathWatch$class.receivedTerminated( > DeathWatch.scala:44) > at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) > at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) > at akka.actor.ActorCell.invoke(ActorCell.scala:486) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > 2018-01-08 22:26:43,069 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Sink: Discarded > events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d). > 2018-01-08 22:26:43,087 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Sink: CounterSink > (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e). > > > *José Miguel Tejedor Fernández* > Server developer > jose.fernan...@rovio.com > > Rovio Entertainment Ltd. > Keilaranta 7 > <https://maps.google.com/?q=Keilaranta+7&entry=gmail&source=g>, FIN - > 02150 Espoo, Finland > www.rovio.com > > > > On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> there is no known limitation in the strict sense, but you might run out >> of dfs space or job manager memory if you keep around a huge number >> checkpoints. I wonder what reason you might have that you ever want such a >> huge number of retained checkpoints? Usually keeping one checkpoint should >> do the job, maybe a couple more if you are very afraid about corruption >> that goes beyond your DFSs capabilities to handle it. Is there any reason >> for that or maybe a misconception about increasing the number of retained >> checkpoints is good for? >> >> Best, >> Stefan >> >> >> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>: >> >> Hi, >> >> Increasing akka’s timeouts is rarely a solution for any problems - it >> either do not help, or just mask the issue making it less visible. But yes, >> it is possible to bump the limits: https://ci.apache.org/ >> projects/flink/flink-docs-release-1.3/setup/config.html#dist >> ributed-coordination-via-akka >> >> I don’t think that state.checkpoints.num-retained was thought to handle >> such large numbers of retained checkpoint so maybe there are some >> known/unknown limitations. Stefan, do you know something in this regard? >> >> Parallel thing to do is that like for any other akka timeout, you should >> track down the root cause of it. This one warning line doesn’t tell much. >> From where does it come from? Client log? Job manager log? Task manager >> log? Please search on the opposite side of the time outing connection for >> possible root cause of the timeout including: >> - possible error/exceptions/warnings >> - long GC pauses or other blocking operations (possibly long unnatural >> gaps in the logs) >> - machine health (CPU usage, disks usage, network connections) >> >> Piotrek >> >> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez < >> jose.fernan...@rovio.com> wrote: >> >> Hello, >> >> I have several stream jobs running (v. 1.3.1 ) in production which always >> fails after a fixed period of around 30h after being executing. That's the >> WARN trace before failing: >> >> Association with remote system >> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, >> address is now gated for [5000] ms. Reason: [Association failed with >> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: >> [No response from remote for outbound association. Handshake timed out after >> [20000 ms]. >> >> >> The main change done in the job configuration was to increase the >> state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous >> RocksDB to persists to snapshot the state. (I attach some screenshots with >> the checkpoint conf from webUI) >> >> >> - May my assumption be correct that the increase of >> checkpoints.num-retained is causing the problem? Any known issue regarding >> this? >> >> >> - Besides, Is there any way to increase the Akka handshake timeout >> from the current 20000 ms to a higher value? I considered that it may be >> convenient to increase the timeout to 1 minute instead. >> >> >> BR >> >> >> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at >> 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png> >> >> >> >> > >