This makes total sense and actually is smart ( defensive ). Will test and report. I think though that this needs to be documented :)
On Wed, Apr 24, 2019 at 6:03 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Vishal, > > it seems that the following is happening: You triggered the cancel with > savepoint command from via the REST call. This command is an asynchronous > operation which produces a result (the savepoint path). In order to deliver > asynchronous results to the caller, Flink waits before shutting down until > they are delivered or until it times out after 5 minutes. I assume that you > don't request the savepoint path from Flink via the returned URL from the > original request. This could either happen if you kill the CLI before its > done or if you have written your own method to trigger this operation. > > I guess we could add a flag for asynchronous operations which tells Flink > that their results don't need to get delivered to some client. If you would > like to have such a feature, then please open a JIRA issue for it. > > Cheers, > Till > > On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Anyione ? >> >> >> >> I think there some race condition . These are the TM logs.. I am >> puzzled b'coz in a larger pipe ( there are about 32 lots on 8 replicas and >> it works >> >> >> >> >> 2019-04-24 01:16:20,889 DEBUG >> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - >> Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2. >> >> 2019-04-24 01:16:20,894 DEBUG >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >> JobManager connection for job 00000000000000000000000000000000. >> >> org.apache.flink.util.FlinkException: Stopping JobMaster for job >> EventCountJob(00000000000000000000000000000000). >> >> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355) >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504) >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170) >> >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >> >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >> >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> 2019-04-24 01:16:20,895 INFO >> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot >> reconnect to job 00000000000000000000000000000000 because it is not >> registered. >> >> 2019-04-24 01:16:21,053 DEBUG >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received >> heartbeat request from e61c2b7d992f151936e21db1ca06666d. >> >> 2019-04-24 01:16:22,136 DEBUG >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >> ping response for sessionid: 0x25add5478fb2ec6 after 0ms >> >> 2019-04-24 01:16:31,052 DEBUG >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received >> heartbeat request from e61c2b7d992f151936e21db1ca06666d. >> >> 2019-04-24 01:16:35,483 DEBUG >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >> ping response for sessionid: 0x25add5478fb2ec6 after 0ms >> >> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> I see this in the TM pod >>> >>> 2019-04-23 19:08:41,828 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms >>> >>> 2019-04-23 19:08:47,543 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> 2019-04-23 19:08:55,175 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 1ms >>> >>> 2019-04-23 19:08:57,548 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> 2019-04-23 19:09:07,543 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> 2019-04-23 19:09:08,523 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms >>> >>> 2019-04-23 19:09:17,542 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> 2019-04-23 19:09:21,871 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms >>> >>> 2019-04-23 19:09:27,543 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> 2019-04-23 19:09:35,218 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms >>> >>> 2019-04-23 19:09:37,542 DEBUG >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22. >>> >>> >>> >>> JM log has analogous.. >>> >>> >>> 2019-04-23 19:10:49,218 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>> Got ping response for sessionid: 0x25add5478fb2e7c after 0ms >>> >>> >>> >>> Does that ring a bell ? >>> >>> >>> >>> On Tue, Apr 23, 2019 at 2:04 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Adding the DEBUG logs from the time I call a REST based cancel with >>>> save point... >>>> >>>> On Tue, Apr 23, 2019 at 2:01 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Though looking at >>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java#L88 >>>>> it >>>>> does seem that the last log . is expected. >>>>> >>>>> Not sure what part is hanging... I have more logs I can share... >>>>> >>>>> On Tue, Apr 23, 2019 at 1:59 PM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> I am seeing this weird issue where I do a save point with cancel on a >>>>>> job on k8s and it hangs for 5 minutes ( no INFO logs ) and then exits >>>>>> with >>>>>> code of 2. >>>>>> >>>>>> >>>>>> 2019-04-23 17:36:31,372 INFO >>>>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - >>>>>> Shutting down rest endpoint. >>>>>> >>>>>> 2019-04-23 17:36:31,374 INFO >>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>>> - Stopping ZooKeeperLeaderRetrievalService >>>>>> /leader/resource_manager_lock. >>>>>> >>>>>> 2019-04-23 17:36:31,377 INFO >>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>> Suspending SlotPool. >>>>>> >>>>>> 2019-04-23 17:36:31,378 DEBUG >>>>>> org.apache.flink.runtime.jobmaster.JobMaster - >>>>>> Close ResourceManager connection 181a4fd61044033a2ea32e384096247f. >>>>>> >>>>>> org.apache.flink.util.FlinkException: JobManager is shutting down. >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:365) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >>>>>> >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>>>> >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>> >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>>>> >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>> >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>>> >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>> >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>> >>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >>>>>> 2019-04-23 17:36:31,381 INFO >>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>> Stopping SlotPool. >>>>>> >>>>>> 2019-04-23 17:36:31,381 INFO >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Disconnect job manager a41a5dceae5ad3664ff1f0b79f3e47ef >>>>>> @akka.tcp://flink@kafka-to-prometheus:6123/user/jobmanager_0 for job >>>>>> 00000000000000000000000000000000 from the resource manager. >>>>>> >>>>>> 2019-04-23 17:36:31,385 INFO >>>>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService >>>>>> - Stopping ZooKeeperLeaderElectionService >>>>>> ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> and after 5 minutes .. >>>>>> >>>>>> >>>>>> >>>>>> 019-04-23 17:41:32,187 DEBUG >>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - >>>>>> Freed 8 thread-local buffer(s) from thread: Finalizer >>>>>> >>>>>> 2019-04-23 17:41:32,198 INFO >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcService - >>>>>> Stopped Akka RPC service. >>>>>> >>>>>> 2019-04-23 17:41:32,200 INFO >>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint >>>>>> - Terminating cluster entrypoint process >>>>>> StandaloneJobClusterEntryPoint with exit code 2. >>>>>> >>>>>> java.util.concurrent.TimeoutException >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:942) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$11(FutureUtils.java:360) >>>>>> >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> In the interim, I get this at a regular clip >>>>>> >>>>>> >>>>>> >>>>>> 2019-04-23 17:37:02,452 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - >>>>>> Release TaskExecutor 3752235c49428b94a0520f04266973eb because it exceeded >>>>>> the idle timeout. >>>>>> >>>>>> 2019-04-23 17:37:02,453 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Worker 68c5fbd67ac2bbe6fc35ed068ce1c4b1 could not be stopped. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Any ideas as to whether it is this inability to shut down the Worker >>>>>> that is causing this issue ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Regards. >>>>>> >>>>>