Re-adding user mailing list.

Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s 
Mission Control) can lead you to the solution. Once you confirm that it’s a GC 
issue, there are numerous resources online how to analyse the cause of the 
problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC 
issues caused by one thread, can cause performance bottlenecks in other 
unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered 
input/output data) to find Task that’s causing a bottleneck. Then you can use 
CPU profiler to analyse why is that happening.

Piotrek

> On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for your feedback. Makes sense about the checkpoint barriers - this 
> definitely could be the cause of a problem.
> 
> I would advice profiling your job to find out what’s going on.
> 
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for 
> tools with which to do this?
> 
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that causes 
> a problem, or 
> 2) Is is the shuffle of all the records after the expansion which is taking a 
> large time - if so, is there anything I can do to mitigate this other than 
> trying to ensure less shuffle.
> 
> Thanks,
> Padarn
> 
> 
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
>> Do you mind elaborating on this? What technology would you propose as an 
>> alternative, and why would this increase checkpointing time? 
> 
> The problem is that when Flink starts checkpoint and inject checkpoint 
> barriers, those checkpoint barriers travel through the Job Graph. The quicker 
> they can do that the better. How fast does it take depends on the amount of 
> buffered data before checkpoint barriers (currently all of such records must 
> be processed before checkpoint barrier is passed down stream). The more 
> buffered records and the more time it takes to process those records, the 
> longer the checkpoint take time. Obviously if one stage in the job is 
> multiplying the amount of records, it can in a way multiply the amount of 
> “buffered work” that needs to be processed before checkpoint barriers pass 
> through.
> 
> However it might not be the case for you. To analyse what’s going on you 
> would need to look at various Flink metrics, like checkpoint times, back 
> pressured tasks, state of the output/input buffers of the tasks, etc. However 
> #2, those are secondary issues. First of all you should try to pin point the 
> cause of long GC pauses. If it comes from your code, you should fix this 
> first. If that either isn’t the issue or doesn’t solve it, generally speaking 
> I would advice profiling your job to find out what’s going on.
> 
> Piotrek
> 
>> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com 
>> <mailto:pad...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 
>> shortly - there is some S3 fix I'd like to take advantage of).
>> 
>> Generally speaking Flink might not the best if you have records fan out, 
>> this may significantly increase checkpointing time. 
>> 
>> Do you mind elaborating on this? What technology would you propose as an 
>> alternative, and why would this increase checkpointing time? 
>> 
>> However you might want to first identify what’s causing long GC times.
>> 
>> My current plan is to try and enable GC logs and see if I can get something 
>> meaningful from them. 
>> 
>> Thanks a lot,
>> Padarn 
>> 
>> 
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Hi,
>> 
>> What Flink version are you using?
>> 
>> Generally speaking Flink might not the best if you have records fan out, 
>> this may significantly increase checkpointing time. 
>> 
>> However you might want to first identify what’s causing long GC times. If 
>> there are long GC pause, this should be the first thing to fix.
>> 
>> Piotrek
>> 
>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com 
>>> <mailto:pad...@gmail.com>> wrote:
>>> 
>>> Hi all again - following up on this I think I've identified my problem as 
>>> being something else, but would appreciate if anyone can offer advice.
>>> 
>>> After running my stream from sometime, I see that my garbage collector for 
>>> old generation starts to take a very long time:
>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>> here the purple line is young generation time, this is ever increasing, but 
>>> grows slowly, while the blue is old generation.
>>> This in itself is not a problem, but as soon as the next checkpoint is 
>>> triggered after this happens you see the following:
>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>> It looks like the checkpoint hits a cap, but this is only because the 
>>> checkpoints start to timeout and fail (these are the alignment time per 
>>> operator)
>>> 
>>> I do notice that my state is growing quite larger over time, but I don't 
>>> have a good understanding of what would cause this to happen with the JVM 
>>> old generation metric, which appears to be the leading metric before a 
>>> problem is noticed. Other metrics such as network buffers also show that at 
>>> the checkpoint time things start to go haywire and the situation never 
>>> recovers.
>>> 
>>> Thanks
>>> 
>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com 
>>> <mailto:pad...@gmail.com>> wrote:
>>> Hi all,
>>> 
>>> I'm trying to process many records, and I have an expensive operation I'm 
>>> trying to optimize. Simplified it is something like:
>>> 
>>> Data: (key1, count, time)
>>> 
>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>             -> Sink
>>> 
>>> In the Map -> Flatmap, what is happening is that each key is mapping to a 
>>> set of keys, and then this is set as the new key. This effectively increase 
>>> the size of the stream by 16x
>>> 
>>> What I am trying to figure out is how to set the parallelism of my 
>>> operators. I see in some comments that people suggest your source, sink and 
>>> aggregation should have different parallelism, but I'm not clear on exactly 
>>> why, or what this means for CPU utilization. 
>>> (see for example 
>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>  
>>> <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
>>> 
>>> Also, it isn't clear to me the best way to handle this increase in data 
>>> within the stream itself.
>>> 
>>> Thanks
>> 
> 

Reply via email to