Hi, I think the checkpointing is not the root cause of your job failure. As the log describes, your job failed caused by the authorization issue of Kafka. "Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed."
Best, Yangze Guo On Fri, May 7, 2021 at 11:29 PM sudhansu jena <sudhansu.jena...@gmail.com> wrote: > > Hi Team, > > We have recently enabled checking pointing using FsStateBackend where we are > trying to use S3 bucket as the persistent storage but after enabling it we > are running into issues while submitting the job into the cluster. > > Can you please let us know if we are missing anything ? > > > Below is the code sample for enabling Checkpointing. > > env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/fhirmapper")); > env.enableCheckpointing(1000); > > > > Below logs for the issue. > > > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, > backoffTimeMS=15000) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) > at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.kafka.common.errors.TransactionalIdAuthorizationException: > Transactional Id authorization failed. > > > Thanks, > Sudhansu > >