Hi Matthias,
Thanks for suggesting a workaround, but our jobs fail with below error when I
apply that change
Caused by: java.lang.IllegalArgumentException: TaskAttemptId string :
attempt__0000_r_000001_1632168799349 is not properly formed
at
org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
at
com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Am I missing something here?
Thanks,
Siddharth
From: Matthias Pohl <[email protected]>
Sent: Monday, September 20, 2021 4:54 AM
To: Shah, Siddharth [Engineering] <[email protected]>
Cc: [email protected]; Hailu, Andreas [Engineering]
<[email protected]>
Subject: Re: hdfs lease issues on flink retry
I don't know of any side effects of your approach. But another workaround I saw
was replacing the _0 suffix by something like "_" + System.currentMillis()
On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth
<[email protected]<mailto:[email protected]>> wrote:
Hi Matthias,
Thanks for looking into the issue and creating a ticket. I am thinking of
having a workaround until the issue is fixed.
What if I create the attempt directories with a random int by patching
HadoopOutputFormatBase’s open() method?
Original:
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) +
"s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
Patched:
int attemptRandomPrefix = new Random().nextInt(999);
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
+ String.format("%" + (4 -
Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(attemptRandomPrefix) + "_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) +
"s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
So basically I am creating a directory named attempt__0123_r_0001_0 instead of
attempt__0000_r_0001_0. I have tested on a handful of our jobs and seems to be
working fine. Just wanted to check any downside of this changes that I may not
be aware of?
Thanks,
Siddharth
From: Matthias Pohl <[email protected]<mailto:[email protected]>>
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering]
<[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>; Hailu, Andreas
[Engineering]
<[email protected]<mailto:[email protected]>>
Subject: Re: hdfs lease issues on flink retry
Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.
[1]
https://issues.apache.org/jira/browse/FLINK-24147<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>
On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl
<[email protected]<mailto:[email protected]>> wrote:
I see - I should have checked my mailbox before answering. I received the email
and was able to login.
On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl
<[email protected]<mailto:[email protected]>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also
good to include the Flink logs and make them accessible for everyone. This way
others could share their perspective as well...
On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering]
<[email protected]<mailto:[email protected]>> wrote:
Hi Matthias,
Thank you for responding and taking time to look at the issue.
Uploaded the yarn lags here:
https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/
and have also requested read permissions for you. Please let us know if you’re
not able to see the files.
From: Matthias Pohl <[email protected]<mailto:[email protected]>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering]
<[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>; Hailu, Andreas
[Engineering]
<[email protected]<mailto:[email protected]>>
Subject: Re: hdfs lease issues on flink retry
Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share
your Flink and YARN logs? This way we could get a better understanding of
what's going on.
Best,
Matthias
On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering]
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,
We are seeing transient failures in the jobs mostly requiring higher resources
and using flink
RestartStrategies<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
[1]. Upon checking the yarn logs we have observed hdfs lease issues when flink
retry happens. The job originally fails for the first try with
PartitionNotFoundException or NoResourceAvailableException., but on retry it
seems form the yarn logs is that the lease for the temp sink directory is not
yet released by the node from previous try.
Initial Failure log message:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate enough slots to run the job. Please make sure that the
cluster has enough resources.
at
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
Retry failure log message:
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException):
/user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet
for client 10.51.63.226 already exists
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
I could verify that it’s the same nodes from previous try owning the lease, and
checked for multiple jobs by matching IP addresses. Ideally, we want an
internal retry to happen since there will be thousands of jobs running at a
time and hard to manually retry them.
This is our current restart config:
executionEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
Time.of(10, TimeUnit.SECONDS)));
Is it possible to resolve leases before a retry? Or is it possible to have
different sink directories (increment attempt id somewhere) for every retry,
that way we have no lease issues? Or do you have any other suggestion on
resolving this?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
Thanks,
Siddharth
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
--
Matthias Pohl | Engineer
Follow us @VervericaData
Ververica<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>
--
Join Flink
Forward<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=>
- The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>