Re: akka timeout exception

2018-11-08 Thread Anil
Thanks for the reply Dawid. The Flink jobs are deployed in Yarn cluster. I am
seeing the  error in Job Manager log for some jobs too frequently. I'm using
Flink 1.4.2. I'm running only Streaming Jobs. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: akka timeout exception

2018-11-08 Thread K Fred
Hi,

I got the same exception when running in flink cluster. The settings is
below:

flink version: 1.5.4

flink-conf.yaml:
jobmanager.heap.mb: 102400
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 40
parallelism.default: 40

I have 5 task manager.

My code just read hbase table data and write to another table. The size of
data about 1TB.

Thanks!



On Thu, Nov 8, 2018 at 5:50 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> Could you provide us with some more information? Which version of flink
> are you running? In which cluster setup? When does this exception occur?
> This exception says that request for status overview (no of
> taskmanagers, slots info etc.) failed.
>
> Best,
>
> Dawid
>
> On 31/10/2018 20:05, Anil wrote:
> > getting this error in my job manager too frequently. any help. Thanks!
> >
> > java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException:
> > Ask timed out on [Actor[akka://flink/user/jobmanager#1927353472]] after
> > [1 ms]. Sender[null] sent message of type
> > "org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview".
> >   at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >   at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >   at
> >
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >   at
> >
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> >   at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >   at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >   at
> >
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
> >   at akka.dispatch.OnComplete.internal(Future.scala:258)
> >   at akka.dispatch.OnComplete.internal(Future.scala:256)
> >   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> >   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> >   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >   at
> >
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> >   at
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> >   at
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> >   at scala.concurrent.Promise$class.complete(Promise.scala:55)
> >   at
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
> >   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
> >   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
> >   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >   at
> >
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
> >   at
> >
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
> >   at
> >
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
> >   at
> >
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
> >   at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> >   at
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
> >   at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> >   at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
> >   at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> >   at
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> >   at
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> >   at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> >   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> >   at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> >   at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> >   at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> >   at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> >   at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> >   at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> >   at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> >   at java.lang.Thread.run(Thread.java:748)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > 

Re: akka timeout exception

2018-11-08 Thread Dawid Wysakowicz
Hi,

Could you provide us with some more information? Which version of flink
are you running? In which cluster setup? When does this exception occur?
This exception says that request for status overview (no of
taskmanagers, slots info etc.) failed.

Best,

Dawid

On 31/10/2018 20:05, Anil wrote:
> getting this error in my job manager too frequently. any help. Thanks!
>
> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
> Ask timed out on [Actor[akka://flink/user/jobmanager#1927353472]] after
> [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview".
>   at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>   at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
>   at akka.dispatch.OnComplete.internal(Future.scala:258)
>   at akka.dispatch.OnComplete.internal(Future.scala:256)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>   at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>   at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>   at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>   at scala.concurrent.Promise$class.complete(Promise.scala:55)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>   at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>   at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>   at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>   at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>   at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>   at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>   at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>   at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager#1927353472]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview".
>   at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>   ... 9 more
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: akka timeout

2017-09-26 Thread Till Rohrmann
Alright. Glad to hear that things are now working :-)

On Tue, Sep 26, 2017 at 9:55 AM, Steven Wu  wrote:

> Till, sorry for the confusion. I meant Flink documentation has the correct
> info. our code was mistakenly referring to akka.ask.timeout for death watch.
>
> On Mon, Sep 25, 2017 at 3:52 PM, Till Rohrmann 
> wrote:
>
>> Quick question Steven. Where did you find the documentation concerning
>> that the death watch interval is linke to the akka ask timeout? It was
>> included in the past, but I couldn't find it anymore.
>>
>> Cheers,
>> Till
>>
>> On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann 
>> wrote:
>>
>>> Great to hear that you could figure things out Steven.
>>>
>>> You are right. The death watch is no longer linked to the akka ask
>>> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
>>> documentation.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu 
>>> wrote:
>>>
 just to close the thread. akka death watch was triggered by high GC
 pause, which is caused by memory leak in our code during Flink job restart.

 noted that akka.ask.timeout wasn't related to akka death watch, which
 Flink has documented and linked.

 On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu 
 wrote:

> this is a stateless job. so we don't use RocksDB.
>
> yeah. network can also be a possibility. will keep it in the radar.
> unfortunately, our metrics system don't have the tcp metrics when running
> inside containers.
>
> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> are you using the RocksDB state backend already?
>> Maybe writing the state to disk would actually reduce the pressure on
>> the GC (but of course it'll also reduce throughput a bit).
>>
>> Are there any known issues with the network? Maybe the network bursts
>> on restart cause the timeouts?
>>
>>
>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu 
>> wrote:
>>
>>> Bowen,
>>>
>>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high
>>> GC pause and akka timeout was happening. So maybe memory allocation and 
>>> GC
>>> wasn't really an issue. I also recently learned that JVM can pause for
>>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
>>> wrote:
>>>
 Hi Steven,
 Yes, GC is a big overhead, it may cause your CPU utilization to
 reach 100%, and every process stopped working. We ran into this a 
 while too.

 How much memory did you assign to TaskManager? How much the
 your CPU utilization when your taskmanager is considered 'killed'?

 Bowen



 On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
 wrote:

> Till,
>
> Once our job was restarted for some reason (e.g. taskmangaer
> container got killed), it can stuck in continuous restart loop for 
> hours.
> Right now, I suspect it is caused by GC pause during restart, our job 
> has
> very high memory allocation in steady state. High GC pause then 
> caused akka
> timeout, which then caused jobmanager to think taksmanager containers 
> are
> unhealthy/dead and kill them. And the cycle repeats...
>
> But I hasn't been able to prove or disprove it yet. When I was
> asking the question, I was still sifting through metrics and error 
> logs.
>
> Thanks,
> Steven
>
>
> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
> till.rohrm...@gmail.com> wrote:
>
>> Hi Steven,
>>
>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>> pick up the right timeout value from the configuration. Instead it 
>> uses a
>> hardcoded 10s timeout. This has only been changed recently and is 
>> already
>> committed in the master. So with the next release 1.4 it will 
>> properly pick
>> up the right timeout settings.
>>
>> Just out of curiosity, what's the instability issue you're
>> observing?
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
>> wrote:
>>
>>> Till/Chesnay, thanks for the answers. Look like this is a
>>> result/symptom of underline stability issue that I am trying to 
>>> track down.
>>>
>>> It is Flink 1.2.

Re: akka timeout

2017-09-26 Thread Steven Wu
Till, sorry for the confusion. I meant Flink documentation has the correct
info. our code was mistakenly referring to akka.ask.timeout for death watch.

On Mon, Sep 25, 2017 at 3:52 PM, Till Rohrmann  wrote:

> Quick question Steven. Where did you find the documentation concerning
> that the death watch interval is linke to the akka ask timeout? It was
> included in the past, but I couldn't find it anymore.
>
> Cheers,
> Till
>
> On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann 
> wrote:
>
>> Great to hear that you could figure things out Steven.
>>
>> You are right. The death watch is no longer linked to the akka ask
>> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
>> documentation.
>>
>> Cheers,
>> Till
>>
>> On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu  wrote:
>>
>>> just to close the thread. akka death watch was triggered by high GC
>>> pause, which is caused by memory leak in our code during Flink job restart.
>>>
>>> noted that akka.ask.timeout wasn't related to akka death watch, which
>>> Flink has documented and linked.
>>>
>>> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu 
>>> wrote:
>>>
 this is a stateless job. so we don't use RocksDB.

 yeah. network can also be a possibility. will keep it in the radar.
 unfortunately, our metrics system don't have the tcp metrics when running
 inside containers.

 On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
 wrote:

> Hi,
> are you using the RocksDB state backend already?
> Maybe writing the state to disk would actually reduce the pressure on
> the GC (but of course it'll also reduce throughput a bit).
>
> Are there any known issues with the network? Maybe the network bursts
> on restart cause the timeouts?
>
>
> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu 
> wrote:
>
>> Bowen,
>>
>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high
>> GC pause and akka timeout was happening. So maybe memory allocation and 
>> GC
>> wasn't really an issue. I also recently learned that JVM can pause for
>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>
>> Thanks,
>> Steven
>>
>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
>> wrote:
>>
>>> Hi Steven,
>>> Yes, GC is a big overhead, it may cause your CPU utilization to
>>> reach 100%, and every process stopped working. We ran into this a while 
>>> too.
>>>
>>> How much memory did you assign to TaskManager? How much the your
>>> CPU utilization when your taskmanager is considered 'killed'?
>>>
>>> Bowen
>>>
>>>
>>>
>>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
>>> wrote:
>>>
 Till,

 Once our job was restarted for some reason (e.g. taskmangaer
 container got killed), it can stuck in continuous restart loop for 
 hours.
 Right now, I suspect it is caused by GC pause during restart, our job 
 has
 very high memory allocation in steady state. High GC pause then caused 
 akka
 timeout, which then caused jobmanager to think taksmanager containers 
 are
 unhealthy/dead and kill them. And the cycle repeats...

 But I hasn't been able to prove or disprove it yet. When I was
 asking the question, I was still sifting through metrics and error 
 logs.

 Thanks,
 Steven


 On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
 till.rohrm...@gmail.com> wrote:

> Hi Steven,
>
> quick correction for Flink 1.2. Indeed the MetricFetcher does not
> pick up the right timeout value from the configuration. Instead it 
> uses a
> hardcoded 10s timeout. This has only been changed recently and is 
> already
> committed in the master. So with the next release 1.4 it will 
> properly pick
> up the right timeout settings.
>
> Just out of curiosity, what's the instability issue you're
> observing?
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
> wrote:
>
>> Till/Chesnay, thanks for the answers. Look like this is a
>> result/symptom of underline stability issue that I am trying to 
>> track down.
>>
>> It is Flink 1.2.
>>
>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>> ches...@apache.org> wrote:
>>
>>> The MetricFetcher always use the default akka timeout value.
>>>
>>>
>>> On 18.08.2017 09:07, Till Rohrmann 

Re: akka timeout

2017-09-25 Thread Till Rohrmann
Quick question Steven. Where did you find the documentation concerning that
the death watch interval is linke to the akka ask timeout? It was included
in the past, but I couldn't find it anymore.

Cheers,
Till

On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann  wrote:

> Great to hear that you could figure things out Steven.
>
> You are right. The death watch is no longer linked to the akka ask
> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
> documentation.
>
> Cheers,
> Till
>
> On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu  wrote:
>
>> just to close the thread. akka death watch was triggered by high GC
>> pause, which is caused by memory leak in our code during Flink job restart.
>>
>> noted that akka.ask.timeout wasn't related to akka death watch, which
>> Flink has documented and linked.
>>
>> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:
>>
>>> this is a stateless job. so we don't use RocksDB.
>>>
>>> yeah. network can also be a possibility. will keep it in the radar.
>>> unfortunately, our metrics system don't have the tcp metrics when running
>>> inside containers.
>>>
>>> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,
 are you using the RocksDB state backend already?
 Maybe writing the state to disk would actually reduce the pressure on
 the GC (but of course it'll also reduce throughput a bit).

 Are there any known issues with the network? Maybe the network bursts
 on restart cause the timeouts?


 On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu 
 wrote:

> Bowen,
>
> Heap size is ~50G. CPU was actually pretty low (like <20%) when high
> GC pause and akka timeout was happening. So maybe memory allocation and GC
> wasn't really an issue. I also recently learned that JVM can pause for
> writing to GC log for disk I/O. that is another lead I am pursuing.
>
> Thanks,
> Steven
>
> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
> wrote:
>
>> Hi Steven,
>> Yes, GC is a big overhead, it may cause your CPU utilization to
>> reach 100%, and every process stopped working. We ran into this a while 
>> too.
>>
>> How much memory did you assign to TaskManager? How much the your
>> CPU utilization when your taskmanager is considered 'killed'?
>>
>> Bowen
>>
>>
>>
>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
>> wrote:
>>
>>> Till,
>>>
>>> Once our job was restarted for some reason (e.g. taskmangaer
>>> container got killed), it can stuck in continuous restart loop for 
>>> hours.
>>> Right now, I suspect it is caused by GC pause during restart, our job 
>>> has
>>> very high memory allocation in steady state. High GC pause then caused 
>>> akka
>>> timeout, which then caused jobmanager to think taksmanager containers 
>>> are
>>> unhealthy/dead and kill them. And the cycle repeats...
>>>
>>> But I hasn't been able to prove or disprove it yet. When I was
>>> asking the question, I was still sifting through metrics and error logs.
>>>
>>> Thanks,
>>> Steven
>>>
>>>
>>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
>>> till.rohrm...@gmail.com> wrote:
>>>
 Hi Steven,

 quick correction for Flink 1.2. Indeed the MetricFetcher does not
 pick up the right timeout value from the configuration. Instead it 
 uses a
 hardcoded 10s timeout. This has only been changed recently and is 
 already
 committed in the master. So with the next release 1.4 it will properly 
 pick
 up the right timeout settings.

 Just out of curiosity, what's the instability issue you're
 observing?

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
 wrote:

> Till/Chesnay, thanks for the answers. Look like this is a
> result/symptom of underline stability issue that I am trying to track 
> down.
>
> It is Flink 1.2.
>
> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
> ches...@apache.org> wrote:
>
>> The MetricFetcher always use the default akka timeout value.
>>
>>
>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>
>> Hi Steven,
>>
>> I thought that the MetricFetcher picks up the right timeout from
>> the configuration. Which version of Flink are you using?
>>
>> The timeout is not a critical problem for the job health.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 

Re: akka timeout

2017-09-25 Thread Till Rohrmann
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout,
because of FLINK-6495. Thanks for the feedback. I will correct the
documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu  wrote:

> just to close the thread. akka death watch was triggered by high GC pause,
> which is caused by memory leak in our code during Flink job restart.
>
> noted that akka.ask.timeout wasn't related to akka death watch, which
> Flink has documented and linked.
>
> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:
>
>> this is a stateless job. so we don't use RocksDB.
>>
>> yeah. network can also be a possibility. will keep it in the radar.
>> unfortunately, our metrics system don't have the tcp metrics when running
>> inside containers.
>>
>> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>> are you using the RocksDB state backend already?
>>> Maybe writing the state to disk would actually reduce the pressure on
>>> the GC (but of course it'll also reduce throughput a bit).
>>>
>>> Are there any known issues with the network? Maybe the network bursts on
>>> restart cause the timeouts?
>>>
>>>
>>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:
>>>
 Bowen,

 Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
 pause and akka timeout was happening. So maybe memory allocation and GC
 wasn't really an issue. I also recently learned that JVM can pause for
 writing to GC log for disk I/O. that is another lead I am pursuing.

 Thanks,
 Steven

 On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
 wrote:

> Hi Steven,
> Yes, GC is a big overhead, it may cause your CPU utilization to
> reach 100%, and every process stopped working. We ran into this a while 
> too.
>
> How much memory did you assign to TaskManager? How much the your
> CPU utilization when your taskmanager is considered 'killed'?
>
> Bowen
>
>
>
> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
> wrote:
>
>> Till,
>>
>> Once our job was restarted for some reason (e.g. taskmangaer
>> container got killed), it can stuck in continuous restart loop for hours.
>> Right now, I suspect it is caused by GC pause during restart, our job has
>> very high memory allocation in steady state. High GC pause then caused 
>> akka
>> timeout, which then caused jobmanager to think taksmanager containers are
>> unhealthy/dead and kill them. And the cycle repeats...
>>
>> But I hasn't been able to prove or disprove it yet. When I was asking
>> the question, I was still sifting through metrics and error logs.
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
>> till.rohrm...@gmail.com> wrote:
>>
>>> Hi Steven,
>>>
>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>>> pick up the right timeout value from the configuration. Instead it uses 
>>> a
>>> hardcoded 10s timeout. This has only been changed recently and is 
>>> already
>>> committed in the master. So with the next release 1.4 it will properly 
>>> pick
>>> up the right timeout settings.
>>>
>>> Just out of curiosity, what's the instability issue you're observing?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
>>> wrote:
>>>
 Till/Chesnay, thanks for the answers. Look like this is a
 result/symptom of underline stability issue that I am trying to track 
 down.

 It is Flink 1.2.

 On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
 ches...@apache.org> wrote:

> The MetricFetcher always use the default akka timeout value.
>
>
> On 18.08.2017 09:07, Till Rohrmann wrote:
>
> Hi Steven,
>
> I thought that the MetricFetcher picks up the right timeout from
> the configuration. Which version of Flink are you using?
>
> The timeout is not a critical problem for the job health.
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
> wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also
>> confirmed the setting in Flink UI. But I saw akka timeout of 10 s for
>> metric query service. two questions
>> 1) why doesn't metric query use the 60 s value configured in yaml
>> file? does it always use default 10 s value?
>> 2) could this cause heartbeat failure between task manager and
>> job 

Re: akka timeout

2017-09-23 Thread Steven Wu
just to close the thread. akka death watch was triggered by high GC pause,
which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink
has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:

> this is a stateless job. so we don't use RocksDB.
>
> yeah. network can also be a possibility. will keep it in the radar.
> unfortunately, our metrics system don't have the tcp metrics when running
> inside containers.
>
> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> are you using the RocksDB state backend already?
>> Maybe writing the state to disk would actually reduce the pressure on the
>> GC (but of course it'll also reduce throughput a bit).
>>
>> Are there any known issues with the network? Maybe the network bursts on
>> restart cause the timeouts?
>>
>>
>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:
>>
>>> Bowen,
>>>
>>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>>> pause and akka timeout was happening. So maybe memory allocation and GC
>>> wasn't really an issue. I also recently learned that JVM can pause for
>>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
>>> wrote:
>>>
 Hi Steven,
 Yes, GC is a big overhead, it may cause your CPU utilization to
 reach 100%, and every process stopped working. We ran into this a while 
 too.

 How much memory did you assign to TaskManager? How much the your
 CPU utilization when your taskmanager is considered 'killed'?

 Bowen



 On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
 wrote:

> Till,
>
> Once our job was restarted for some reason (e.g. taskmangaer container
> got killed), it can stuck in continuous restart loop for hours. Right now,
> I suspect it is caused by GC pause during restart, our job has very high
> memory allocation in steady state. High GC pause then caused akka timeout,
> which then caused jobmanager to think taksmanager containers are
> unhealthy/dead and kill them. And the cycle repeats...
>
> But I hasn't been able to prove or disprove it yet. When I was asking
> the question, I was still sifting through metrics and error logs.
>
> Thanks,
> Steven
>
>
> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
> till.rohrm...@gmail.com> wrote:
>
>> Hi Steven,
>>
>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>> pick up the right timeout value from the configuration. Instead it uses a
>> hardcoded 10s timeout. This has only been changed recently and is already
>> committed in the master. So with the next release 1.4 it will properly 
>> pick
>> up the right timeout settings.
>>
>> Just out of curiosity, what's the instability issue you're observing?
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
>> wrote:
>>
>>> Till/Chesnay, thanks for the answers. Look like this is a
>>> result/symptom of underline stability issue that I am trying to track 
>>> down.
>>>
>>> It is Flink 1.2.
>>>
>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>>> ches...@apache.org> wrote:
>>>
 The MetricFetcher always use the default akka timeout value.


 On 18.08.2017 09:07, Till Rohrmann wrote:

 Hi Steven,

 I thought that the MetricFetcher picks up the right timeout from
 the configuration. Which version of Flink are you using?

 The timeout is not a critical problem for the job health.

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
 wrote:

>
> We have set akka.ask.timeout to 60 s in yaml file. I also
> confirmed the setting in Flink UI. But I saw akka timeout of 10 s for
> metric query service. two questions
> 1) why doesn't metric query use the 60 s value configured in yaml
> file? does it always use default 10 s value?
> 2) could this cause heartbeat failure between task manager and job
> manager? or is this jut non-critical failure that won't affect job 
> health?
>
> Thanks,
> Steven
>
> 2017-08-17 23:34:33,421 WARN 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
> timed out on [Actor[akka.tcp://flink@1.2.3.4
> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]

Re: akka timeout

2017-08-25 Thread Steven Wu
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar.
unfortunately, our metrics system don't have the tcp metrics when running
inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger  wrote:

> Hi,
> are you using the RocksDB state backend already?
> Maybe writing the state to disk would actually reduce the pressure on the
> GC (but of course it'll also reduce throughput a bit).
>
> Are there any known issues with the network? Maybe the network bursts on
> restart cause the timeouts?
>
>
> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:
>
>> Bowen,
>>
>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>> pause and akka timeout was happening. So maybe memory allocation and GC
>> wasn't really an issue. I also recently learned that JVM can pause for
>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>
>> Thanks,
>> Steven
>>
>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
>> wrote:
>>
>>> Hi Steven,
>>> Yes, GC is a big overhead, it may cause your CPU utilization to
>>> reach 100%, and every process stopped working. We ran into this a while too.
>>>
>>> How much memory did you assign to TaskManager? How much the your CPU
>>> utilization when your taskmanager is considered 'killed'?
>>>
>>> Bowen
>>>
>>>
>>>
>>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
>>> wrote:
>>>
 Till,

 Once our job was restarted for some reason (e.g. taskmangaer container
 got killed), it can stuck in continuous restart loop for hours. Right now,
 I suspect it is caused by GC pause during restart, our job has very high
 memory allocation in steady state. High GC pause then caused akka timeout,
 which then caused jobmanager to think taksmanager containers are
 unhealthy/dead and kill them. And the cycle repeats...

 But I hasn't been able to prove or disprove it yet. When I was asking
 the question, I was still sifting through metrics and error logs.

 Thanks,
 Steven


 On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann  wrote:

> Hi Steven,
>
> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick
> up the right timeout value from the configuration. Instead it uses a
> hardcoded 10s timeout. This has only been changed recently and is already
> committed in the master. So with the next release 1.4 it will properly 
> pick
> up the right timeout settings.
>
> Just out of curiosity, what's the instability issue you're observing?
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
> wrote:
>
>> Till/Chesnay, thanks for the answers. Look like this is a
>> result/symptom of underline stability issue that I am trying to track 
>> down.
>>
>> It is Flink 1.2.
>>
>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>> ches...@apache.org> wrote:
>>
>>> The MetricFetcher always use the default akka timeout value.
>>>
>>>
>>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>>
>>> Hi Steven,
>>>
>>> I thought that the MetricFetcher picks up the right timeout from the
>>> configuration. Which version of Flink are you using?
>>>
>>> The timeout is not a critical problem for the job health.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
>>> wrote:
>>>

 We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
 the setting in Flink UI. But I saw akka timeout of 10 s for metric 
 query
 service. two questions
 1) why doesn't metric query use the 60 s value configured in yaml
 file? does it always use default 10 s value?
 2) could this cause heartbeat failure between task manager and job
 manager? or is this jut non-critical failure that won't affect job 
 health?

 Thanks,
 Steven

 2017-08-17 23:34:33,421 WARN 
 org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
 - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
 timed out on [Actor[akka.tcp://flink@1.2.3.4
 :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
 after [1 ms] at akka.pattern.PromiseActorRef$$
 anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
 akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
 at 
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
 at 
 

Re: akka timeout

2017-08-25 Thread Robert Metzger
Hi,
are you using the RocksDB state backend already?
Maybe writing the state to disk would actually reduce the pressure on the
GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on
restart cause the timeouts?

On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:

> Bowen,
>
> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
> pause and akka timeout was happening. So maybe memory allocation and GC
> wasn't really an issue. I also recently learned that JVM can pause for
> writing to GC log for disk I/O. that is another lead I am pursuing.
>
> Thanks,
> Steven
>
> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
> wrote:
>
>> Hi Steven,
>> Yes, GC is a big overhead, it may cause your CPU utilization to reach
>> 100%, and every process stopped working. We ran into this a while too.
>>
>> How much memory did you assign to TaskManager? How much the your CPU
>> utilization when your taskmanager is considered 'killed'?
>>
>> Bowen
>>
>>
>>
>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu  wrote:
>>
>>> Till,
>>>
>>> Once our job was restarted for some reason (e.g. taskmangaer container
>>> got killed), it can stuck in continuous restart loop for hours. Right now,
>>> I suspect it is caused by GC pause during restart, our job has very high
>>> memory allocation in steady state. High GC pause then caused akka timeout,
>>> which then caused jobmanager to think taksmanager containers are
>>> unhealthy/dead and kill them. And the cycle repeats...
>>>
>>> But I hasn't been able to prove or disprove it yet. When I was asking
>>> the question, I was still sifting through metrics and error logs.
>>>
>>> Thanks,
>>> Steven
>>>
>>>
>>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Steven,

 quick correction for Flink 1.2. Indeed the MetricFetcher does not pick
 up the right timeout value from the configuration. Instead it uses a
 hardcoded 10s timeout. This has only been changed recently and is already
 committed in the master. So with the next release 1.4 it will properly pick
 up the right timeout settings.

 Just out of curiosity, what's the instability issue you're observing?

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
 wrote:

> Till/Chesnay, thanks for the answers. Look like this is a
> result/symptom of underline stability issue that I am trying to track 
> down.
>
> It is Flink 1.2.
>
> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler  > wrote:
>
>> The MetricFetcher always use the default akka timeout value.
>>
>>
>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>
>> Hi Steven,
>>
>> I thought that the MetricFetcher picks up the right timeout from the
>> configuration. Which version of Flink are you using?
>>
>> The timeout is not a critical problem for the job health.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
>> wrote:
>>
>>>
>>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
>>> the setting in Flink UI. But I saw akka timeout of 10 s for metric query
>>> service. two questions
>>> 1) why doesn't metric query use the 60 s value configured in yaml
>>> file? does it always use default 10 s value?
>>> 2) could this cause heartbeat failure between task manager and job
>>> manager? or is this jut non-critical failure that won't affect job 
>>> health?
>>>
>>> Thanks,
>>> Steven
>>>
>>> 2017-08-17 23:34:33,421 WARN 
>>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
>>> timed out on [Actor[akka.tcp://flink@1.2.3.4
>>> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
>>> after [1 ms] at akka.pattern.PromiseActorRef$$
>>> anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
>>> akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>> at 
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>>> at 

Re: akka timeout

2017-08-25 Thread Steven Wu
Bowen,

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
pause and akka timeout was happening. So maybe memory allocation and GC
wasn't really an issue. I also recently learned that JVM can pause for
writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li  wrote:

> Hi Steven,
> Yes, GC is a big overhead, it may cause your CPU utilization to reach
> 100%, and every process stopped working. We ran into this a while too.
>
> How much memory did you assign to TaskManager? How much the your CPU
> utilization when your taskmanager is considered 'killed'?
>
> Bowen
>
>
>
> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu  wrote:
>
>> Till,
>>
>> Once our job was restarted for some reason (e.g. taskmangaer container
>> got killed), it can stuck in continuous restart loop for hours. Right now,
>> I suspect it is caused by GC pause during restart, our job has very high
>> memory allocation in steady state. High GC pause then caused akka timeout,
>> which then caused jobmanager to think taksmanager containers are
>> unhealthy/dead and kill them. And the cycle repeats...
>>
>> But I hasn't been able to prove or disprove it yet. When I was asking the
>> question, I was still sifting through metrics and error logs.
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Steven,
>>>
>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick
>>> up the right timeout value from the configuration. Instead it uses a
>>> hardcoded 10s timeout. This has only been changed recently and is already
>>> committed in the master. So with the next release 1.4 it will properly pick
>>> up the right timeout settings.
>>>
>>> Just out of curiosity, what's the instability issue you're observing?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:
>>>
 Till/Chesnay, thanks for the answers. Look like this is a
 result/symptom of underline stability issue that I am trying to track down.

 It is Flink 1.2.

 On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
 wrote:

> The MetricFetcher always use the default akka timeout value.
>
>
> On 18.08.2017 09:07, Till Rohrmann wrote:
>
> Hi Steven,
>
> I thought that the MetricFetcher picks up the right timeout from the
> configuration. Which version of Flink are you using?
>
> The timeout is not a critical problem for the job health.
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
> wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
>> the setting in Flink UI. But I saw akka timeout of 10 s for metric query
>> service. two questions
>> 1) why doesn't metric query use the 60 s value configured in yaml
>> file? does it always use default 10 s value?
>> 2) could this cause heartbeat failure between task manager and job
>> manager? or is this jut non-critical failure that won't affect job 
>> health?
>>
>> Thanks,
>> Steven
>>
>> 2017-08-17 23:34:33,421 WARN 
>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
>> timed out on [Actor[akka.tcp://flink@1.2.3.4
>> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
>> after [1 ms] at akka.pattern.PromiseActorRef$$
>> anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
>> akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>> at 
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>> at 
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>> at java.lang.Thread.run(Thread.java:748)
>>
>
>
>

>>>
>>
>


Re: akka timeout

2017-08-23 Thread Bowen Li
Hi Steven,
Yes, GC is a big overhead, it may cause your CPU utilization to reach
100%, and every process stopped working. We ran into this a while too.

How much memory did you assign to TaskManager? How much the your CPU
utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu  wrote:

> Till,
>
> Once our job was restarted for some reason (e.g. taskmangaer container got
> killed), it can stuck in continuous restart loop for hours. Right now, I
> suspect it is caused by GC pause during restart, our job has very high
> memory allocation in steady state. High GC pause then caused akka timeout,
> which then caused jobmanager to think taksmanager containers are
> unhealthy/dead and kill them. And the cycle repeats...
>
> But I hasn't been able to prove or disprove it yet. When I was asking the
> question, I was still sifting through metrics and error logs.
>
> Thanks,
> Steven
>
>
> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann 
> wrote:
>
>> Hi Steven,
>>
>> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
>> the right timeout value from the configuration. Instead it uses a hardcoded
>> 10s timeout. This has only been changed recently and is already committed
>> in the master. So with the next release 1.4 it will properly pick up the
>> right timeout settings.
>>
>> Just out of curiosity, what's the instability issue you're observing?
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:
>>
>>> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
>>> of underline stability issue that I am trying to track down.
>>>
>>> It is Flink 1.2.
>>>
>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
>>> wrote:
>>>
 The MetricFetcher always use the default akka timeout value.


 On 18.08.2017 09:07, Till Rohrmann wrote:

 Hi Steven,

 I thought that the MetricFetcher picks up the right timeout from the
 configuration. Which version of Flink are you using?

 The timeout is not a critical problem for the job health.

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
 wrote:

>
> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
> the setting in Flink UI. But I saw akka timeout of 10 s for metric query
> service. two questions
> 1) why doesn't metric query use the 60 s value configured in yaml
> file? does it always use default 10 s value?
> 2) could this cause heartbeat failure between task manager and job
> manager? or is this jut non-critical failure that won't affect job health?
>
> Thanks,
> Steven
>
> 2017-08-17 23:34:33,421 WARN 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
> timed out on [Actor[akka.tcp://flink@1.2.3.4
> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
> after [1 ms] at akka.pattern.PromiseActorRef$$
> anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
> akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:748)
>



>>>
>>
>


Re: akka timeout

2017-08-23 Thread Steven Wu
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got
killed), it can stuck in continuous restart loop for hours. Right now, I
suspect it is caused by GC pause during restart, our job has very high
memory allocation in steady state. High GC pause then caused akka timeout,
which then caused jobmanager to think taksmanager containers are
unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the
question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann 
wrote:

> Hi Steven,
>
> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
> the right timeout value from the configuration. Instead it uses a hardcoded
> 10s timeout. This has only been changed recently and is already committed
> in the master. So with the next release 1.4 it will properly pick up the
> right timeout settings.
>
> Just out of curiosity, what's the instability issue you're observing?
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:
>
>> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
>> of underline stability issue that I am trying to track down.
>>
>> It is Flink 1.2.
>>
>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
>> wrote:
>>
>>> The MetricFetcher always use the default akka timeout value.
>>>
>>>
>>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>>
>>> Hi Steven,
>>>
>>> I thought that the MetricFetcher picks up the right timeout from the
>>> configuration. Which version of Flink are you using?
>>>
>>> The timeout is not a critical problem for the job health.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu  wrote:
>>>

 We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
 setting in Flink UI. But I saw akka timeout of 10 s for metric query
 service. two questions
 1) why doesn't metric query use the 60 s value configured in yaml file?
 does it always use default 10 s value?
 2) could this cause heartbeat failure between task manager and job
 manager? or is this jut non-critical failure that won't affect job health?

 Thanks,
 Steven

 2017-08-17 23:34:33,421 WARN 
 org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
 - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
 out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
 e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
 at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
 at 
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
 at 
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
 at 
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
 at 
 akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
 at 
 akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
 at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
 at java.lang.Thread.run(Thread.java:748)

>>>
>>>
>>>
>>
>


Re: akka timeout

2017-08-22 Thread Till Rohrmann
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
the right timeout value from the configuration. Instead it uses a hardcoded
10s timeout. This has only been changed recently and is already committed
in the master. So with the next release 1.4 it will properly pick up the
right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:

> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
> of underline stability issue that I am trying to track down.
>
> It is Flink 1.2.
>
> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
> wrote:
>
>> The MetricFetcher always use the default akka timeout value.
>>
>>
>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>
>> Hi Steven,
>>
>> I thought that the MetricFetcher picks up the right timeout from the
>> configuration. Which version of Flink are you using?
>>
>> The timeout is not a critical problem for the job health.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu  wrote:
>>
>>>
>>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>>> service. two questions
>>> 1) why doesn't metric query use the 60 s value configured in yaml file?
>>> does it always use default 10 s value?
>>> 2) could this cause heartbeat failure between task manager and job
>>> manager? or is this jut non-critical failure that won't affect job health?
>>>
>>> Thanks,
>>> Steven
>>>
>>> 2017-08-17 23:34:33,421 WARN 
>>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
>>> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
>>> e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>> at 
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>
>>
>>
>


Re: akka timeout

2017-08-18 Thread Steven Wu
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of
underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
wrote:

> The MetricFetcher always use the default akka timeout value.
>
>
> On 18.08.2017 09:07, Till Rohrmann wrote:
>
> Hi Steven,
>
> I thought that the MetricFetcher picks up the right timeout from the
> configuration. Which version of Flink are you using?
>
> The timeout is not a critical problem for the job health.
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu  wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>> service. two questions
>> 1) why doesn't metric query use the 60 s value configured in yaml file?
>> does it always use default 10 s value?
>> 2) could this cause heartbeat failure between task manager and job
>> manager? or is this jut non-critical failure that won't affect job health?
>>
>> Thanks,
>> Steven
>>
>> 2017-08-17 23:34:33,421 WARN 
>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
>> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
>> e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>> at 
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
>> executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverS
>> cheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>> at java.lang.Thread.run(Thread.java:748)
>>
>
>
>


Re: akka timeout

2017-08-18 Thread Till Rohrmann
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the
configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu  wrote:

>
> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
> setting in Flink UI. But I saw akka timeout of 10 s for metric query
> service. two questions
> 1) why doesn't metric query use the 60 s value configured in yaml file?
> does it always use default 10 s value?
> 2) could this cause heartbeat failure between task manager and job
> manager? or is this jut non-critical failure that won't affect job health?
>
> Thanks,
> Steven
>
> 2017-08-17 23:34:33,421 WARN 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_
> 23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
> scala.concurrent.Future$InternalCallbackExecutor$.
> unbatchedExecute(Future.scala:599) at scala.concurrent.
> BatchingExecutor$class.execute(BatchingExecutor.scala:109) at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:748)
>