Re: Execution of action: JobModelVersionChange failed.

2020-03-24 Thread Bharath Kumara Subramanian
Deshpande, Omkar
>  wrote:
>
> > We are using beam with samza runner - beam.version 2.19.0,
> samza.version
> > 1.3.0
> >
> > And we are seeing the following excption frequently. Should we be
> tweaking
> > some configuration? Does this point to any network connectivity
> issue?
> >
> > 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> > Subscribing data changes on the path:
> >
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> > for barrier version: 151.
> > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> > Execution of action: JobModelVersionChange failed.
> > java.lang.IllegalStateException: ZkClient already closed!
> > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> > at
> org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> > at
> > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> > at
> > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> > at
> >
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> > at
> >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> > at
> >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
> > Received exception in debounce timer! Stopping the job coordinator
> > java.lang.IllegalStateException: ZkClient already closed!
> > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> > at
> org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> > at
> > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> > at
> > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> > at
> >
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> > at
> >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> > at
> >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > 2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator -
> Job
> > Coordinator shutdown is in progress!
> > 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer -
> Caught
> > exception/error in run loop.
> > org.apache.samza.SamzaException: Run loop is interrupted
> >   

Re: Execution of action: JobModelVersionChange failed.

2020-03-23 Thread Deshpande, Omkar
Hey Bharath,

I probably know what is taking long in my shutdown sequence.

My code roughly looks like this -  
https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a
I think Thread.sleep(3)  in this code is increasing the time taken for the 
shutdown sequence. Is that correct?

So what does stopping a samza container mean in context of a Beam Pipeline? 
Does pres.waitUntilFinish() return when samza container is being stopped ?

We added Runtime.getRuntime().halt() because our JVM running a Beam pipeline 
was hanging up without exiting, in a lot of cases.  
Do you have suggestions on better way to handle this?

And does rebalancing always include stopping all the containers? 
We are running on K8S and the pods are often moved around. And every time a pod 
is moved, rebalance will be triggered.
And the rebalance in turn will restart all other pods.


On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian"  
wrote:

This email is from an external sender.


Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms <http://task.shutdown.ms> = 1*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 3*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms <http://task.shutdown.ms> = 1 + x*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 3 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
 wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> 
/app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> 
org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> 
org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> 
org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> 
java.util.concu

Re: Execution of action: JobModelVersionChange failed.

2020-03-22 Thread Bharath Kumara Subramanian
Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms <http://task.shutdown.ms> = 1*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 3*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms <http://task.shutdown.ms> = 1 + x*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 3 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
 wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
> Received exception in debounce timer! Stopping the job coordinator
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledT

Execution of action: JobModelVersionChange failed.

2020-03-21 Thread Deshpande, Omkar
We are using beam with samza runner - beam.version 2.19.0, samza.version 1.3.0

And we are seeing the following excption frequently. Should we be tweaking some 
configuration? Does this point to any network connectivity issue?

2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade - Subscribing 
data changes on the path: 
/app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
 for barrier version: 151.
2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - Execution of 
action: JobModelVersionChange failed.
java.lang.IllegalStateException: ZkClient already closed!
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
at 
org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
at 
org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
at 
org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - Received 
exception in debounce timer! Stopping the job coordinator
java.lang.IllegalStateException: ZkClient already closed!
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
at 
org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
at 
org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
at 
org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator - Job 
Coordinator shutdown is in progress!
2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught 
exception/error in run loop.
org.apache.samza.SamzaException: Run loop is interrupted
at 
org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method)
at 
org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
... 7 common frames omitted
2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer - Shutting 
down SamzaContainer.
2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult - Container 
shutdown timed out after 1 ms.
java.util.concurrent.TimeoutException: Container shutdown timed out after