Re: Support for multiple slots per task manager in Flink 1.5

2018-12-05 Thread
Hi Pawel,

Flink 1.5 supports multiple sets per TM. “Not fully supported yet” refers to an 
issue [1] when requesting TM from SlotManager, which will allocate more TM than 
actual (but idle TMs will be released later).

So I suggest you to check further in your timeout logs to identify other 
issues. You can also set the slots per TM to 1 to verify this.

[1] https://issues.apache.org/jira/browse/FLINK-9455?jql=text%20~%20%22slot%22

> On Dec 5, 2018, at 7:41 PM, Pawel Bartoszek  
> wrote:
> 
> Hi,
> 
> 
> According to the Flink 1.5 release docs multiple slots per task manager are 
> "not fully supported yet". Can you provide more information about what are 
> the risks of running more than one slot per tm?
> We are running Flink on EMR on YARN. Previously we run 4 task task managers 
> with 8 slot each now we are running 32 with 1 slot each. However, since we 
> upgraded to Flink 1.5 we started seeing timeouts taking a connection from AWS 
> S3 SDK pool etc. 
> 
> Thanks.
> Pawel



Re: Flink Exception - assigned slot container was removed

2018-11-25 Thread
Hi,

It looks that some of your slots were freed during the job execution (possibly 
due to idle for too long). AFAIK the exception was thrown when a pending Slot 
request was removed. You can try increase the “Slot.idle.timeout” to mitigate 
this issue (default is 5, try 360 or higher).

Regards,
Qi

> On Nov 26, 2018, at 7:36 AM, Flink Developer  
> wrote:
> 
> Hi, I have a Flink application sourcing from a topic in Kafka (400 
> partitions) and sinking to S3 using bucketingsink and using RocksDb for 
> checkpointing every 2 mins. The Flink app runs with parallelism 400 so that 
> each worker handles a partition. This is using Flink 1.5.2. The Flink cluster 
> uses 10 task managers with 40 slots each.
> 
> After running for a few days straight, it encounters a Flink exception:
> Org.apache.flink.util.FlinkException: The assigned slot 
> container_1234567_0003_01_09_1 was removed.
> 
> This causes the Flink job to fail. It is odd to me. I am unsure what causes 
> this. Also, during this time, I see some checkpoints stating "checkpoint was 
> declined (tasks not ready)". At this point, the job is unable to recover and 
> fails. Does this happen if a slot or worker is not doing processing for X 
> amount of time? Would I need to increase the Flink config properties for the 
> following when creating the Flink cluster in yarn?
> 
> Slot.idle.timeout
> Slot.request.timeout
> Web.timeout
> Heartbeat.interval
> Heartbeat.timeout
> 
> Any help would be greatly appreciated.
> 



Re: Auto/Dynamic scaling in Flink

2018-11-15 Thread
Hi Nauroz,

If you’re using Flink 1.5 on Yarn, it supports dynamic task manager allocation 
by default [1]. After skimming the code, it seems to me that in general if 
requested parallelism is larger than available task slots, new task managers 
will be requested via ResourceManager (please correct me if any 
misunderstanding).

We’re also looking into auto scaling issue in Flink, as it’s crucial in our use 
cases (e.g. in batch mode, we will do input with few TMs but output with much 
more TMs). Please kindly let me know if any further information are available.

[1] 
https://stackoverflow.com/questions/38054015/can-yarn-dynamically-allocate-resources-to-flink/38077626#38077626
 


> On Nov 13, 2018, at 8:10 PM, Tzu-Li Chen  wrote:
> 
> Hi,
> 
> Yet Flink does not support auto-scaling. However, there is an umbrella JIRA 
> issue[1]
> to cover the discussion about it. And I think the design doc(draft) 
> attached[2] could help.
> 
> Best,
> tison.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10407 
> 
> [2] 
> https://docs.google.com/document/d/1XKDXnrp8w45k2jIJNHxpNP2Zmr6FGru3H0dTHwYWJyE/edit
>  
> 
> 
> Tzu-Li (Gordon) Tai mailto:tzuli...@apache.org>> 
> 于2018年11月13日周二 下午8:05写道:
> Hi,
> 
> Flink does not support auto-scaling, yet. Rescaling operations currently are 
> always manual, i.e take a savepoint of the Flink job, and when restoring from 
> the savepoint, define a new parallelism for the job.
> As for the metrics to be used for auto-scaling, I can imagine that it would 
> be possible to base this on top of metrics such as TM throughput, 
> backpressure, etc. AFAIK, there also currently isn't any official design or 
> discussion on going for this.
> 
> Cheers,
> Gordon
> 
> On Mon, Nov 12, 2018 at 5:50 PM Nauroz Khan Nausherwani 
>  > wrote:
> Dear Flink Contributors and users,
> 
> I am a PhD student and I was interested to know, using which matrices, and 
> when does Flink performs scaling-in or scaling out of resources? I did search 
> the flink's website where I could only find information about how dynamic 
> scaling is performed in stateless or stateful operator.  It would be 
> interesting to know which matrices Flink uses, and when actually Flink 
> triggers auto-scaling.
> 
> Any link or reference paper with required information is appreciated.
> 
> best regards,
> Nauroz