Hi Frederic,

I’ve once (upon a time 😊) had a similar situation when we changed from Flink 
1.8 to Flink 1.13 … It took me a long time to figure out.
Some hints where to start to look:

  *   _metadata file is used for
     *   Job manager state
     *   Smallish keyed state (in order to avoid too many small state files)
     *   Operator state (non-keyed)
  *   Does the operator that is getting blocked in initialization use operator 
state?
     *   Look for some condition that might cause it growing
     *   In my case back then, a minor condition caused the operator state 
being duplicated per operator parallelism when loading from a savepoint, which 
caused exponential growth per savepoint cycle
  *   You can obtain a local copy of this savepoint and try to load it by means 
of the state-processor-api
     *   Breaking into the debugger, at some point the _metadata file gets 
loaded and allows to determine which state actually had the run-away and what 
might have caused duplication

I hope this helps

Thias



From: Frederic Leger <frederic.le...@asklocala.com>
Sent: Monday, August 28, 2023 12:30 PM
To: user@flink.apache.org
Subject: Checkpoint/savepoint _metadata

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi team,

We use flink 1.16.0 with openjdk-11-jre mainly to run streaming jobs.
We do checkpoints with 2 min interval and savepoint when deploying new job 
version.
We also use rocksdb state backend for most of them.

We had a streaming job running for long without any issue and during a new 
deployment we could not launch it anymore, it was getting stuck on CREATING on 
one task, then was failing and restarting and so on.
In this Flink job, we handle a large data stream using key-based grouping. 
Inside a processFunction, we use MapState[Long, String] as our state storage, 
which keeps data with associated time limits (TTL) of 30 days.

The most relevant error we got from the logs was :

2023-08-02 12:40:52,186 ERROR akka.remote.EndpointWriter                        
           [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402<http://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402>]:
 max allowed size 10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 42582878 bytes.

The solution to solve this issue was to increase akka.framesize from default 
(10MB) to 50MB
akka.framesize: 52428800b

After 16h of uptime, we wanted to move back the job to its initial cluster as 
it was running fine since then, but after the savepoint done, we could not 
launch it back and got this error :

2023-08-03 08:49:06,474 ERROR akka.remote.EndpointWriter                        
           [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447<http://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447>]:
 max allowed size 52428800 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 679594586 bytes.

After some research it seems to be related to _metadata file written when 
checkpointing/savepointing and this file has grown up amazingly in the past 16h 
from 50MB to more than 600MB if we compare the first ERROR and the last one.

Since then we were unable to launch back the job.

Increasing akka.framesize from 50MB to 1GB permit to avoid the above errors, 
but one task was remaining in CREATING state until failure.
We started to get java.lang.OutOfMemoryError: Java heap space on the 
jobmanager, then timeout between the taskmanagers and jobmanager.
The heap size set to avoid the OOM on the jobmanager was from 2GB to 20GB.
Increasing timeouts lead to other errors, like java.lang.OutOfMemoryError: Java 
heap space on the taskmanagers and so on to finally timeout and fail.

2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The heartbeat 
of ResourceManager with id be6b26eb0a0a54e636c9fbfc5f9815f3 timed out.
2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
ResourceManager connection be6b26eb0a0a54e636c9fbfc5f9815f3.
2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to 
ResourceManager 
akka.tcp://flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50)<http://flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50)>.
2023-08-03 10:28:38,411 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The heartbeat 
of JobManager with id 39d49002792d881da6a5e7266c8ee58b timed out.
2023-08-03 10:28:38,412 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
JobManager connection for job f89626c718a36aa0240f8746b8b5a690.

As far as we understand, metadata should never become that huge unless we are 
doing something wrong(?).
Does someone have an idea on how we can diagnose this ?
We have kept a copy of the culprit metadata files and are in the process of 
taking a look at their content, but don't have a precise idea of what we should 
look for.

Any suggestions are welcome.
Thx

________________________________
You received this electronic message as part of a business or employment 
relationship with one or several Ask Locala entities. Its content is strictly 
confidential and is covered by the obligation of confidentiality and business 
secrecy. Any dissemination, copying, printing distribution, retention or use of 
the message’s content or any attachments that could be detrimental to Ask 
Locala is forbidden, even if it was forwarded by mailing lists.

If you are not the intended recipient, please notify the sender of the error 
without delay and delete permanently this email and any files from your system 
and destroy any printed copies.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to