Hi Javier,

I will try to look into this as I have not personally seen this problem
while using the operator .

It would be great if you could reach out to me on slack or email directly
so we can discuss the issue and get to the bottom of it.

Cheer
Gyula

On Fri, 20 Jan 2023 at 23:53, Javier Vegas <jve...@strava.com> wrote:

> My issue is described in https://issues.apache.org/jira/browse/FLINK-21928
> where it says was fixed in 1.14, but I am still seeing the problem.
> Although there it says:
>
> "Additionally, it is still required that the user cleans up the
> corresponding HA entries for the running jobs registry because these
> entries won't be reliably cleaned up when encountering the situation
> described by FLINK-21928
> <https://issues.apache.org/jira/browse/FLINK-21928>."
>
> so I guess I need to do some manual cleanup of my S3 HA data before
> restarting
>
> El vie, 20 ene 2023 a las 4:58, Javier Vegas (<jve...@strava.com>)
> escribió:
>
>>
>> I have a Flink app (Flink 1.16.0, deployed to Kubernetes via operator
>> 1.3.1 and using Kubernetes HighAvailaibilty with storage in S3) that
>> depends on multiple Thrift services for data queries. When one of those
>> services is down (or throws exceptions) the Flink job managers end up
>> crashing and only the task managers remain up. Once the dependencies are
>> fixed, when I try to restart the Flink app I end up with a
>> "DuplicateJobSubmissionException: Job has already been submitted" (see
>> below for detailed log) and the task managers never start. The only
>> solution I have found is to delete the deployment from Kubernetes and then
>> deploy again as a new job.
>>
>> 1) Is there a better way to handle failures on dependencies than letting
>> task managers crash and keep job managers up, and restart after
>> dependencies are fixed?
>> 1) If not, is there a way to handle the DuplicateJobSubmissionException
>> so the Flink app can be restarted without having to uninstall it first?
>>
>> Thanks,
>>
>> Javier Vegas
>>
>>
>> org.apache.flink.util.FlinkException: Failed to execute job
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>> Caused by:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:449)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>> at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> ... 5 more
>> Exception thrown in main on startup
>>
>>
>>
>

Reply via email to