Re-adding user mailing list.
Hi, If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places. If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening. Piotrek > On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com> wrote: > > Hi Piotr, > > Thanks for your feedback. Makes sense about the checkpoint barriers - this > definitely could be the cause of a problem. > > I would advice profiling your job to find out what’s going on. > > Agreed. Outside of inspecting the Flink metrics, do you have suggestions for > tools with which to do this? > > The main thing I'm trying to pin down is: > 1) Is it the downstream processing from the expansion of records that causes > a problem, or > 2) Is is the shuffle of all the records after the expansion which is taking a > large time - if so, is there anything I can do to mitigate this other than > trying to ensure less shuffle. > > Thanks, > Padarn > > > On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > >> Do you mind elaborating on this? What technology would you propose as an >> alternative, and why would this increase checkpointing time? > > The problem is that when Flink starts checkpoint and inject checkpoint > barriers, those checkpoint barriers travel through the Job Graph. The quicker > they can do that the better. How fast does it take depends on the amount of > buffered data before checkpoint barriers (currently all of such records must > be processed before checkpoint barrier is passed down stream). The more > buffered records and the more time it takes to process those records, the > longer the checkpoint take time. Obviously if one stage in the job is > multiplying the amount of records, it can in a way multiply the amount of > “buffered work” that needs to be processed before checkpoint barriers pass > through. > > However it might not be the case for you. To analyse what’s going on you > would need to look at various Flink metrics, like checkpoint times, back > pressured tasks, state of the output/input buffers of the tasks, etc. However > #2, those are secondary issues. First of all you should try to pin point the > cause of long GC pauses. If it comes from your code, you should fix this > first. If that either isn’t the issue or doesn’t solve it, generally speaking > I would advice profiling your job to find out what’s going on. > > Piotrek > >> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com >> <mailto:pad...@gmail.com>> wrote: >> >> Hi Piotr, >> >> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 >> shortly - there is some S3 fix I'd like to take advantage of). >> >> Generally speaking Flink might not the best if you have records fan out, >> this may significantly increase checkpointing time. >> >> Do you mind elaborating on this? What technology would you propose as an >> alternative, and why would this increase checkpointing time? >> >> However you might want to first identify what’s causing long GC times. >> >> My current plan is to try and enable GC logs and see if I can get something >> meaningful from them. >> >> Thanks a lot, >> Padarn >> >> >> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Hi, >> >> What Flink version are you using? >> >> Generally speaking Flink might not the best if you have records fan out, >> this may significantly increase checkpointing time. >> >> However you might want to first identify what’s causing long GC times. If >> there are long GC pause, this should be the first thing to fix. >> >> Piotrek >> >>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com >>> <mailto:pad...@gmail.com>> wrote: >>> >>> Hi all again - following up on this I think I've identified my problem as >>> being something else, but would appreciate if anyone can offer advice. >>> >>> After running my stream from sometime, I see that my garbage collector for >>> old generation starts to take a very long time: >>> <Screen Shot 2019-03-02 at 3.01.57 PM.png> >>> here the purple line is young generation time, this is ever increasing, but >>> grows slowly, while the blue is old generation. >>> This in itself is not a problem, but as soon as the next checkpoint is >>> triggered after this happens you see the following: >>> <Screen Shot 2019-03-02 at 3.02.48 PM.png> >>> It looks like the checkpoint hits a cap, but this is only because the >>> checkpoints start to timeout and fail (these are the alignment time per >>> operator) >>> >>> I do notice that my state is growing quite larger over time, but I don't >>> have a good understanding of what would cause this to happen with the JVM >>> old generation metric, which appears to be the leading metric before a >>> problem is noticed. Other metrics such as network buffers also show that at >>> the checkpoint time things start to go haywire and the situation never >>> recovers. >>> >>> Thanks >>> >>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com >>> <mailto:pad...@gmail.com>> wrote: >>> Hi all, >>> >>> I'm trying to process many records, and I have an expensive operation I'm >>> trying to optimize. Simplified it is something like: >>> >>> Data: (key1, count, time) >>> >>> Source -> Map(x -> (x, newKeyList(x.key1)) >>> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) >>> -> Keyby(_.key1).TublingWindow().apply.. >>> -> Sink >>> >>> In the Map -> Flatmap, what is happening is that each key is mapping to a >>> set of keys, and then this is set as the new key. This effectively increase >>> the size of the stream by 16x >>> >>> What I am trying to figure out is how to set the parallelism of my >>> operators. I see in some comments that people suggest your source, sink and >>> aggregation should have different parallelism, but I'm not clear on exactly >>> why, or what this means for CPU utilization. >>> (see for example >>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly >>> >>> <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>) >>> >>> Also, it isn't clear to me the best way to handle this increase in data >>> within the stream itself. >>> >>> Thanks >> >