Re: Execution of action: JobModelVersionChange failed.
Deshpande, Omkar > wrote: > > > We are using beam with samza runner - beam.version 2.19.0, > samza.version > > 1.3.0 > > > > And we are seeing the following excption frequently. Should we be > tweaking > > some configuration? Does this point to any network connectivity > issue? > > > > 2020/03/21 21:42:09.896 INFO o.a.s.zk.ZkBarrierForVersionUpgrade - > > Subscribing data changes on the path: > > > /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state > > for barrier version: 151. > > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - > > Execution of action: JobModelVersionChange failed. > > java.lang.IllegalStateException: ZkClient already closed! > > at > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > > at > org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > > at > > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > > at > > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > > at > > > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > > at > > > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > > at > > > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > > at > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - > > Received exception in debounce timer! Stopping the job coordinator > > java.lang.IllegalStateException: ZkClient already closed! > > at > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > > at > org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > > at > > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > > at > > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > > at > > > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > > at > > > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > > at > > > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > > at > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > 2020/03/21 21:42:09.897 INFO org.apache.samza.zk.ZkJobCoordinator - > Job > > Coordinator shutdown is in progress! > > 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - > Caught > > exception/error in run loop. > > org.apache.samza.SamzaException: Run loop is interrupted > >
Re: Execution of action: JobModelVersionChange failed.
Hey Bharath, I probably know what is taking long in my shutdown sequence. My code roughly looks like this - https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a I think Thread.sleep(3) in this code is increasing the time taken for the shutdown sequence. Is that correct? So what does stopping a samza container mean in context of a Beam Pipeline? Does pres.waitUntilFinish() return when samza container is being stopped ? We added Runtime.getRuntime().halt() because our JVM running a Beam pipeline was hanging up without exiting, in a lot of cases. Do you have suggestions on better way to handle this? And does rebalancing always include stopping all the containers? We are running on K8S and the pods are often moved around. And every time a pod is moved, rebalance will be triggered. And the rebalance in turn will restart all other pods. On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian" wrote: This email is from an external sender. Hi Omkar, The errors are related to timeouts during shutdown which gets triggered during a rebalance. Whenever a new processor joins the quorum or leaves the quorum, a rebalance is triggered which requires all the existing processors to shutdown its container before agreeing on the new job model. In your case, it looks like the container is taking beyond the configured timeout (task.shutdown.ms) and hence throwing an exception. Do you know what is taking so long in your shutdown sequence? Meanwhile, you can start by increasing the shutdown timeout to a higher value. *Note:* You will need to account for the increase in the *consensus timeout* - the time the leader of the quorum will wait for other participants to agree on the new job model. If the other processors are still in the shutdown phase, the leader may end up expiring the current barrier and trigger another rebalance. For e.g. if the current setup is *task.shutdown.ms <http://task.shutdown.ms> = 1* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 3* then your new setup will roughly (depending on how much room you already have between these two configurations) need to be following where "*x*" - denotes the increase in the value *task.shutdown.ms <http://task.shutdown.ms> = 1 + x* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 3 + x* Let me know how it goes. Thanks, Bharath On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar wrote: > We are using beam with samza runner - beam.version 2.19.0, samza.version > 1.3.0 > > And we are seeing the following excption frequently. Should we be tweaking > some configuration? Does this point to any network connectivity issue? > > 2020/03/21 21:42:09.896 INFO o.a.s.zk.ZkBarrierForVersionUpgrade - > Subscribing data changes on the path: > /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state > for barrier version: 151. > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - > Execution of action: JobModelVersionChange failed. > java.lang.IllegalStateException: ZkClient already closed! > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > at > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > at > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > at > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concu
Re: Execution of action: JobModelVersionChange failed.
Hi Omkar, The errors are related to timeouts during shutdown which gets triggered during a rebalance. Whenever a new processor joins the quorum or leaves the quorum, a rebalance is triggered which requires all the existing processors to shutdown its container before agreeing on the new job model. In your case, it looks like the container is taking beyond the configured timeout (task.shutdown.ms) and hence throwing an exception. Do you know what is taking so long in your shutdown sequence? Meanwhile, you can start by increasing the shutdown timeout to a higher value. *Note:* You will need to account for the increase in the *consensus timeout* - the time the leader of the quorum will wait for other participants to agree on the new job model. If the other processors are still in the shutdown phase, the leader may end up expiring the current barrier and trigger another rebalance. For e.g. if the current setup is *task.shutdown.ms <http://task.shutdown.ms> = 1* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 3* then your new setup will roughly (depending on how much room you already have between these two configurations) need to be following where "*x*" - denotes the increase in the value *task.shutdown.ms <http://task.shutdown.ms> = 1 + x* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 3 + x* Let me know how it goes. Thanks, Bharath On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar wrote: > We are using beam with samza runner - beam.version 2.19.0, samza.version > 1.3.0 > > And we are seeing the following excption frequently. Should we be tweaking > some configuration? Does this point to any network connectivity issue? > > 2020/03/21 21:42:09.896 INFO o.a.s.zk.ZkBarrierForVersionUpgrade - > Subscribing data changes on the path: > /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state > for barrier version: 151. > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - > Execution of action: JobModelVersionChange failed. > java.lang.IllegalStateException: ZkClient already closed! > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > at > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > at > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > at > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - > Received exception in debounce timer! Stopping the job coordinator > java.lang.IllegalStateException: ZkClient already closed! > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > at > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > at > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > at > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledT
Execution of action: JobModelVersionChange failed.
We are using beam with samza runner - beam.version 2.19.0, samza.version 1.3.0 And we are seeing the following excption frequently. Should we be tweaking some configuration? Does this point to any network connectivity issue? 2020/03/21 21:42:09.896 INFO o.a.s.zk.ZkBarrierForVersionUpgrade - Subscribing data changes on the path: /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state for barrier version: 151. 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - Execution of action: JobModelVersionChange failed. java.lang.IllegalStateException: ZkClient already closed! at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) at org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception in debounce timer! Stopping the job coordinator java.lang.IllegalStateException: ZkClient already closed! at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) at org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020/03/21 21:42:09.897 INFO org.apache.samza.zk.ZkJobCoordinator - Job Coordinator shutdown is in progress! 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught exception/error in run loop. org.apache.samza.SamzaException: Run loop is interrupted at org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262) at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) at org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259) ... 7 common frames omitted 2020/03/21 21:42:09.898 INFO o.a.samza.container.SamzaContainer - Shutting down SamzaContainer. 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult - Container shutdown timed out after 1 ms. java.util.concurrent.TimeoutException: Container shutdown timed out after