My issue is described in https://issues.apache.org/jira/browse/FLINK-21928
where it says was fixed in 1.14, but I am still seeing the problem.
Although there it says:

"Additionally, it is still required that the user cleans up the
corresponding HA entries for the running jobs registry because these
entries won't be reliably cleaned up when encountering the situation
described by FLINK-21928 <https://issues.apache.org/jira/browse/FLINK-21928>."


so I guess I need to do some manual cleanup of my S3 HA data before
restarting

El vie, 20 ene 2023 a las 4:58, Javier Vegas (<jve...@strava.com>) escribió:

>
> I have a Flink app (Flink 1.16.0, deployed to Kubernetes via operator
> 1.3.1 and using Kubernetes HighAvailaibilty with storage in S3) that
> depends on multiple Thrift services for data queries. When one of those
> services is down (or throws exceptions) the Flink job managers end up
> crashing and only the task managers remain up. Once the dependencies are
> fixed, when I try to restart the Flink app I end up with a
> "DuplicateJobSubmissionException: Job has already been submitted" (see
> below for detailed log) and the task managers never start. The only
> solution I have found is to delete the deployment from Kubernetes and then
> deploy again as a new job.
>
> 1) Is there a better way to handle failures on dependencies than letting
> task managers crash and keep job managers up, and restart after
> dependencies are fixed?
> 1) If not, is there a way to handle the DuplicateJobSubmissionException so
> the Flink app can be restarted without having to uninstall it first?
>
> Thanks,
>
> Javier Vegas
>
>
> org.apache.flink.util.FlinkException: Failed to execute job
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
> Caused by:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:449)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> Exception thrown in main on startup
>
>
>

Reply via email to