Re: Execution of action: JobModelVersionChange failed.

2020-03-24 Thread Bharath Kumara Subramanian
Hi Omkar,

So what does stopping a samza container mean in context of a Beam Pipeline?


A container shutting down during a rebalance shouldn't affect the beam
application unless the shutdown was unsuccessful.


> Does pres.waitUntilFinish() return when samza container is being stopped ?
>

waitUntilFinish() is a blocking call and will only return when the
application exits (e.g. non-streaming jobs) or in the event of application
errors (errors within samza container etc)
In a standalone setup, samza application can either be run as an individual
application or along side with other services sharing the same JVM. For
this reason, any runtime errors in samza container doesn't bring down the
JVM by default. However, it might be desirable for some applications to
bring down their application and that can be achieved by using a monitor
thread that waits on the *pipeline.waitUntilFinish* and then triggers
*System.exit()* or *Runtime.halt() *or* necessary steps.*

And does rebalancing always include stopping all the containers?
>
Yes, rebalances are handled by coordinators which brings down the existing
container and spins up a new container.

I think Thread.sleep(3)  in this code is increasing the time taken for
> the shutdown sequence. Is that correct?
>
I don't think that is causing delays in the shutdown. The block of code
only gets triggered after *waitUntilFinish* returns which is due to
exception in container shutdown sequence. You may want to check if there
any pluggable components (like Consumers, TableProviders) etc take longer
time to shutdown.

As far as your code snippet, I think the reason shutdown signals don't
bring down your application is because of deadlock

between your main thread which does a *System.exit()* and the shutdown hook
which waits on the *mainThread.join()*.


Thanks,
Bharath

On Mon, Mar 23, 2020 at 3:28 PM Deshpande, Omkar
 wrote:

> Hey Bharath,
>
> I probably know what is taking long in my shutdown sequence.
>
> My code roughly looks like this -
> https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a
> I think Thread.sleep(3)  in this code is increasing the time taken for
> the shutdown sequence. Is that correct?
>
> So what does stopping a samza container mean in context of a Beam
> Pipeline? Does pres.waitUntilFinish() return when samza container is being
> stopped ?
>
> We added Runtime.getRuntime().halt() because our JVM running a Beam
> pipeline was hanging up without exiting, in a lot of cases.
> Do you have suggestions on better way to handle this?
>
> And does rebalancing always include stopping all the containers?
> We are running on K8S and the pods are often moved around. And every time
> a pod is moved, rebalance will be triggered.
> And the rebalance in turn will restart all other pods.
>
>
> On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian" <
> codin.mart...@gmail.com> wrote:
>
> This email is from an external sender.
>
>
> Hi Omkar,
>
> The errors are related to timeouts during shutdown which gets triggered
> during a rebalance. Whenever a new processor joins the quorum or
> leaves the
> quorum, a rebalance is triggered which requires all the existing
> processors
> to shutdown its container before agreeing on the new job model.
> In your case, it looks like the container is taking beyond the
> configured
> timeout (task.shutdown.ms) and hence throwing an exception.
>
> Do you know what is taking so long in your shutdown sequence?
>
> Meanwhile, you can start by increasing the shutdown timeout to a higher
> value.
> *Note:* You will need to account for the increase in the *consensus
> timeout*
> - the time the leader of the quorum will wait for other participants to
> agree on the new job model. If the other processors are still in the
> shutdown phase, the leader may end up expiring the current barrier and
> trigger another rebalance.
>
> For e.g. if the current setup is
> *task.shutdown.ms  = 1*
> *job.coordinator.zk.consensus.timeout.ms
>  = 3*
> then your new setup will roughly (depending on how much room you
> already
> have between these two configurations) need to be following where
> "*x*" -
> denotes the increase in the value
> *task.shutdown.ms  = 1 + x*
> *job.coordinator.zk.consensus.timeout.ms
>  = 3 + x*
> Let me know how it goes.
>
> Thanks,
> Bharath
>
> On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
>  wrote:
>
> > We are using beam with samza runner - beam.version 2.19.0,
> samza.version
> > 1.3.0
> >
> > And we are seeing the following excption frequently. Should we be
> tweaking
> > some configuration? Does this point to any network 

Re: Execution of action: JobModelVersionChange failed.

2020-03-23 Thread Deshpande, Omkar
Hey Bharath,

I probably know what is taking long in my shutdown sequence.

My code roughly looks like this -  
https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a
I think Thread.sleep(3)  in this code is increasing the time taken for the 
shutdown sequence. Is that correct?

So what does stopping a samza container mean in context of a Beam Pipeline? 
Does pres.waitUntilFinish() return when samza container is being stopped ?

We added Runtime.getRuntime().halt() because our JVM running a Beam pipeline 
was hanging up without exiting, in a lot of cases.  
Do you have suggestions on better way to handle this?

And does rebalancing always include stopping all the containers? 
We are running on K8S and the pods are often moved around. And every time a pod 
is moved, rebalance will be triggered.
And the rebalance in turn will restart all other pods.


On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian"  
wrote:

This email is from an external sender.


Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms  = 1*
*job.coordinator.zk.consensus.timeout.ms
 = 3*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms  = 1 + x*
*job.coordinator.zk.consensus.timeout.ms
 = 3 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
 wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> 
/app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> 
org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> 
org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> 
org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 ERROR 

Re: Execution of action: JobModelVersionChange failed.

2020-03-22 Thread Bharath Kumara Subramanian
Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms  = 1*
*job.coordinator.zk.consensus.timeout.ms
 = 3*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms  = 1 + x*
*job.coordinator.zk.consensus.timeout.ms
 = 3 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
 wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
> Received exception in debounce timer! Stopping the job coordinator
> java.lang.IllegalStateException: ZkClient already closed!
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
> at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
> at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
> at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
> at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
> at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
>