Thanks Piotr and Stefan,

The problem was the overhead in the heap memory usage of the JobManager
when increasing the num-retained checkpoints. It was solved once I revert
that value to one.

BR

That's the actual error according to the JobManager log in the OOM:

2018-01-08 22:27:09,293 WARN
org.jboss.netty.channel.socket.nio.AbstractNioSelector
      - Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2018-01-08 22:27:15,796 ERROR akka.actor.ActorSystemImpl
                - Uncaught error from thread
[flink-akka.actor.default-dispatcher-22840]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,288 ERROR akka.actor.ActorSystemImpl
                - Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-22839]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,882 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
       - Removing web dashboard root cache directory
/tmp/flink-web-f75e187d-3d08-4864-ba08-1740c8586be1
2018-01-08 22:27:17,394 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
       - Removing web dashboard jar upload directory
/tmp/flink-web-2c8657f2-9b87-4964-bde4-9997ef31966d
2018-01-08 22:27:19,863 INFO  org.apache.flink.runtime.blob.BlobServer
                - Stopped BLOB server at 0.0.0.0:44378



here’s an test with Abba after it has accumulated state for 21 hours
it seems that creating a nightly savepoint won’t necessarily be scalable
so being able to use incremental checkpoints would still seem appealing, if
possible
or why not making a savepoint somehow by copying the required files of a
checkpoint instead - but I doubt that flink would support that



*José Miguel Tejedor Fernández*
Server developer
jose.fernan...@rovio.com

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 3:08 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> This Task Manager log is suggesting that problems lays on the Job Manager
> side (no visible gap in the logs, GC Time reported is accumulated and 31
> seconds accumulated over 963 gc collections is low value). Could you show
> the Job Manager log itself? Probably it’s the own that’s causing the
> TaskManager to timeout.
>
> On the other hand, I see that Task Manager max heap size is ~5GB and I
> assume this is the same setting for the Job manager. A Stefan pointed out,
> there is some memory overhead on the Job Manager for retaining the
> checkpoint and it is around couple of hundred bytes (maybe even 1KB) per
> operator instance. By doing quick math:
>
> 2880 checkpoints * 10 task managers * 10 operators in the job * 8
> parallelism per task manager * 500 bytes = ~1GB
>
> The answer might be that you just need to increase the Job Manager max
> heap to retain 2880 checkpoints.
>
> Piotrek
>
> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <
> jose.fernan...@rovio.com> wrote:
>
> Hi,
>
> I wonder what reason you might have that you ever want such a huge number
>> of retained checkpoints?
>
>
> The Flink jobs running on EMR cluster require a checkpoint at midnight.
> (In our use case we need to synch a loaded delta to our a third party
> partner with the streamed data). The delta load the whole day data and
> that's why we wanted to have available the midnight's checkpoint to start
> from there.
> We could also make a savepoint at midnight, but it’s not as handy (we
> would need to build our own tooling to do it), and it can’t benefit from
> the smaller latency of an incremental checkpoint. Another thining is that
> implementing our own savepoint tool is a bit hard to monitor. Besides,
> retaining several having checkpoints created every minute is that it would
> also allow us to load a delta at any time. Please, if there are better ways
> of achieving this, let me know.
>
> From where does the log trace come from?
>
>
> It comes from the TaskManager.
>
> Please search on the opposite side of the time outing connection for
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural
>> gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>
>
> It seems that TaskManager disconnect from JobManager and then cannot reach
> it again and I cannot tell the reason. I think machine health metrics
> mentioned above seems to be OK. Would you say *Direct memory stats *usage
> is correct? What is the way to check the GC pauses?
> Those are some traces from the TaskManager log, before/after it detached
> from JobManager
>
> 2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476,
> GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP:
> 116/119/-1 MB (used/committed/max)]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Direct memory stats: Count: 100, Total Capacity: 29942814,
> Used Memory: 29942815
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Off-heap pool stats: [Code Cache: 42/42/240 MB
> (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)],
> [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
> 2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476,
> GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher
>                  - Detected unreachable: [akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341]
> 2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager
>                    - TaskManager akka://flink/user/taskmanager
> disconnects from JobManager akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no
> longer reachable
> 2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager
>                    - Cancelling all computations and discarding all cached
> data.
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Attempting to fail task externally Sink: Discarded
> events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Sink: Discarded events (4/4) (
> 50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager
> disconnects from JobManager akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no
> longer reachable
> at org.apache.flink.runtime.taskmanager.TaskManager.
> handleJobManagerDisconnect(TaskManager.scala:1095)
> at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at org.apache.flink.runtime.taskmanager.TaskManager.
> aroundReceive(TaskManager.scala:120)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
> DeathWatch.scala:44)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Triggering cancellation of task code Sink: Discarded
> events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Attempting to fail task externally Sink: CounterSink
> (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
>
>
> *José Miguel Tejedor Fernández*
> Server developer
> jose.fernan...@rovio.com
>
> Rovio Entertainment Ltd.
> Keilaranta 7
> <https://maps.google.com/?q=Keilaranta+7&entry=gmail&source=g>, FIN -
> 02150 Espoo, Finland
> www.rovio.com
>
>
>
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> there is no known limitation in the strict sense, but you might run out
>> of dfs space or job manager memory if you keep around a huge number
>> checkpoints. I wonder what reason you might have that you ever want such a
>> huge number of retained checkpoints? Usually keeping one checkpoint should
>> do the job, maybe a couple more if you are very afraid about corruption
>> that goes beyond your DFSs capabilities to handle it. Is there any reason
>> for that or maybe a misconception about increasing the number of retained
>> checkpoints is good for?
>>
>> Best,
>> Stefan
>>
>>
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>:
>>
>> Hi,
>>
>> Increasing akka’s timeouts is rarely a solution for any problems - it
>> either do not help, or just mask the issue making it less visible. But yes,
>> it is possible to bump the limits: https://ci.apache.org/
>> projects/flink/flink-docs-release-1.3/setup/config.html#dist
>> ributed-coordination-via-akka
>>
>> I don’t think that state.checkpoints.num-retained was thought to handle
>> such large numbers of retained checkpoint so maybe there are some
>> known/unknown limitations. Stefan, do you know something in this regard?
>>
>> Parallel thing to do is that like for any other akka timeout, you should
>> track down the root cause of it. This one warning line doesn’t tell much.
>> From where does it come from? Client log? Job manager log? Task manager
>> log? Please search on the opposite side of the time outing connection for
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural
>> gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>>
>> Piotrek
>>
>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <
>> jose.fernan...@rovio.com> wrote:
>>
>> Hello,
>>
>> I have several stream jobs running (v. 1.3.1 ) in production which always
>> fails after a fixed period of around 30h after being executing. That's the
>> WARN trace before failing:
>>
>> Association with remote system 
>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, 
>> address is now gated for [5000] ms. Reason: [Association failed with 
>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: 
>> [No response from remote for outbound association. Handshake timed out after 
>> [20000 ms].
>>
>>
>> The main change done in the job configuration was to increase the
>> state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous
>> RocksDB to persists to snapshot the state. (I attach some screenshots with
>> the  checkpoint conf from webUI)
>>
>>
>>    - May my assumption be correct that the increase of
>>    checkpoints.num-retained is causing the problem? Any known issue regarding
>>    this?
>>
>>
>>    - Besides, Is there any way to increase the Akka handshake timeout
>>    from the current 20000 ms to a higher value? I considered that it may be
>>    convenient to increase the timeout to 1 minute instead.
>>
>>
>> BR
>>
>>
>> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at
>> 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
>>
>>
>>
>>
>
>

Reply via email to