Hi Slim, for your initial question concerning the size of _metadata. When Flink writes the checkpoint, it assumes some kind of DFS. Pretty much all known DFS implementations behave poorly for many small files. If you run a job with 5 tasks and parallelism of 120, then you'd get 600 small checkpoint files (or more depending on the configuration).
To solve it, Flink combines very small files into the _metadata according to some threshold [1]. These small files can quickly add up though. You can disable that behavior by setting the threshold to 0. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#advanced-state-backends-options On Thu, Nov 19, 2020 at 12:57 AM Slim Bouguerra <slim.bougue...@gmail.com> wrote: > Hi Yun, > Thanks for the help after applying your recommendation, I am getting the > same issue aka very long checkpoints and then timeout > Now My guess is maybe the datagen source is pushing the checkpoint via the > network to JM is there a way to double check? > IF that is the case is there a way to exclude the source operators from > the checkpoints ? > Thanks > Please find the attached logs: > 1 I checked the shared folder and it has the shared operator state. > 2 I did set the value of fs-memory-threshold to 1kb > > This the source of the SQL testing job > > CREATE TABLE datagen ( > f_sequence INT, > f_random INT, > f_random_str STRING, > f_random_str_4 STRING, > f_random_str_3 STRING, > f_random_str_2 STRING, > f_random_str_1 STRING, > ts AS localtimestamp, > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'datagen', > -- optional options -- > 'rows-per-second'='500000', > 'fields.f_sequence.kind'='sequence', > 'fields.f_sequence.start'='1', > 'fields.f_sequence.end'='200000000', > 'fields.f_random.min'='1', > 'fields.f_random.max'='100', > 'fields.f_random_str.length'='100000', > 'fields.f_random_str_4.length'='100000', > 'fields.f_random_str_3.length'='100000', > 'fields.f_random_str_2.length'='100000', > 'fields.f_random_str_1.length'='100000' > ); > > --------------------------------------- > With more debugging I see this exception stack on the job manager > java.io.IOException: The rpc invocation size 199965215 exceeds the maximum > akka framesize. > > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source) [?:?] > > > at > org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46) > [flink-dist_2.11-1.11.1.jar:1.1 > .1[] > > > at > org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160) > [flink-dist_2.11-1.11 > 1.jar:1.11.1[] > > > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_172] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_172] > > ---------------------------------------------- > And sometime the JM dies with this OOM > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172] > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) > ~[?:1.8.0_172] > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > ~[?:1.8.0_172] > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > ~[?:1.8.0_172] > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > ~[?:1.8.0_172] > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > ~[?:1.8.0_172] > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > ~[?:1.8.0_172] > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_172] > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:324) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.serialization.JavaSerializer.toBinary(Serializer.scala:324) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:53) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:905) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter.delegate$1(Endpoint.scala:682) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter.writeLoop$1(Endpoint.scala:693) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter.sendBufferedMessages(Endpoint.scala:706) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointWriter$$anonfun$3.applyOrElse(Endpoint.scala:637) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > > On Wed, Nov 18, 2020 at 12:16 AM Yun Tang <myas...@live.com> wrote: > >> Hi Slim >> >> You could check the logs of taskmanager to see whether incremental >> checkpoint is really enabled (or you could find whether files existed under >> /opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/shared to >> judge). >> If your configuration of rocksDB and incremental-checkpoingt is really >> enabled, I think the large metadata size is caused by the memory threshold >> [1] which will send data in bytes format back to JM directly if state >> handle is smaller than specific threshold. >> Try to decrease this value to '1 kb' to see whether the size of meta data >> could also decrease. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-fs-memory-threshold >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Slim Bouguerra <bs...@apache.org> >> *Sent:* Wednesday, November 18, 2020 6:16 >> *To:* user@flink.apache.org <user@flink.apache.org> >> *Subject:* Job Manager is taking very long time to finalize the >> Checkpointing. >> >> >> Originally posed to the dev list >> ---------- Forwarded message --------- >> From: *Slim Bouguerra* <bs...@apache.org> >> Date: Tue, Nov 17, 2020 at 8:09 AM >> Subject: Job Manager is taking very long time to finalize the >> Checkpointing. >> To: <d...@flink.apache.org> >> >> >> Hi Devs, >> I am very new to the Flink code base and working on the evaluation of >> the Checkpointing strategy >> >> In my current setup I am using an NFS based file system as a checkpoint >> store. (NAS/NFS has a very high TP over 2GB/s on one node and I am using 12 >> NFS servers ) >> When pushing the system to some relatively medium scale aka 120 subtasks >> over 6 works with a total state of 100GB. >> I observe that the Job manager takes over 2 minutes to finalize the >> checkpoint. (observed on the UI and CPU profiling of JM see the flame graph >> of 30 second sample) >> As you can see by the attached Flames graphs the JM is very busy >> serializing the metadata >> (>org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.serializeOperatorState >> (2,875 samples, 99.65%)) >> Now the question is why this metadata file is so big in the order of 3GBs >> in my case. >> How does this size scale ? num_of_tasks * num_states ? >> >> /opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/chk-1 >> bash-4.2$ ls -all -h >> -rw-r--r-- 1 flink flink 3.0G Nov 17 01:42 _metadata >> >> The second question how to better measure the time taken by the JM to >> commit the transaction aka time_done_checkpoint - time_got_all_ask_form_tm >> Is there a config flag I am missing to make this last step faster ? >> >> My current configs for Checkpoints >> state.backend: rocksdb >> # See the PV mount path need to be the same as <mountPath: >> "/opt/flink/pv"> >> state.checkpoints.dir: file:///opt/flink/pv/checkpoints >> state.savepoints.dir: file:///opt/flink/pv/savepoints >> state.backend.incremental: true >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing >> execution.checkpointing.interval: 60000 >> execution.checkpointing.mode: AT_LEAST_ONCE >> # hitting The rpc invocation size 19598830 exceeds the maximum akka >> akka.framesize: 100485760b >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#heartbeat-timeout >> heartbeat.timeout: 70000 >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout >> execution.checkpointing.timeout: 15minutes >> >> >> some metadata about the checkpoint >> >> {"@class":"completed","id":1,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1605315046120,"latest_ack_timestamp":1605315093466,"state_size":12239786229,"end_to_end_duration":47346,"alignment_buffered":0,"num_subtasks":120,"num_acknowledged_subtasks":120,"tasks":{},"external_path":"file:/opt/flink/pv/checkpoints/7474752476036c14d7fdeb4e86af3638/chk-1"} >> > > > -- > > B-Slim > _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng