If you have a window larger than hours then you need to rethink your architecture - this is not streaming anymore. Only because you receive events in a streamed fashion you don’t need to do all the processing in a streamed fashion. Can you store the events in a file or a database and then do after 30 days batch processing on them?
Another aspect could be also to investigate why your source sends duplicated entries. > On 27. Aug 2018, at 04:30, Ning Shi <nings...@gmail.com> wrote: > > The application consumes from a single Kafka topic, deserializes the > JSON payload into POJOs and use a big keyed window (30+ days) for > deduplication, then emits the result for every single event to four > other keyed windows for aggregation. It looks roughly like the > following. > > Source->KeyBy(A,B,C) > | > | -->KeyBy(A,B,C)->Hourly Window(sum) > v |->KeyBy(A,B,C)->Daily Window(sum) > Big Window(sum, emit per event) -| > |->KeyBy(D,E)->Hourly Window(sum) > -->KeyBy(D,E)->Daily Window(sum) > > The cardinality for the window keyed on (A,B,C) is high, could be in the > millions. The values (A,B,C) are all strings. > > I'm doing performance testing by letting the application consuming the > past 7 days data from Kafka. However, the performance is not good and > I'm having some trouble interpreting the results. All tests were done on > AWS using i3.xlarge with 2 slots per TM. This was tested with one, > three, and six TMs. Parallelism was set to the same as the total number > of slots available, e.g. 6 for 3 nodes with 2 slots per TM. > > - The application would always start at consuming ~500 messages/s from > Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I > noticed that the disk I/O would reduce noticeable when the performance > jumped. > > - Regardless of the number of TMs used, it always peaked at ~5,000 > messages/s and had the same behavior as described above. > > - In the Flink UI, it always shows that the Source was back pressured by > the Big window when the performance was at ~500 messages/s, and no > back pressure at all once the performance reaches ~5,000 messages/s. > > - I took some Flight Recorder recordings and it showed that the time > trigger Big window thread was always doing > SystemProcessingTimeService$TriggerTask.run(). Since I'm only > triggering the Big window by count of events, why would this be > running? > > - Flight Recorder also showed that the Big window thread was either > doing RocksDB writes or gets most of the time when the performance was > low. I understand that it keeps the states in RocksDB, but I wasn't > expecting it to tank the performance like this. > > - Flight Recorder showed that the hottest methods were all about Kryo > serialization. > > - GC was ok, nothing longer than 20ms and there weren't a lot of them. > > My questions are > > - Why is the performance so bad and why didn't it scale as I increase > the number of TMs. > > - Why would the performance jump suddenly after 20 minutes or so? > > - I know the JSON and POJO serialization is not great. Could it be this > bad? > > Any insights or guidance on how I can diagnose the issue further will be > greatly appreciated. > > Thanks, > > Ning