If you have a window larger than hours then you need to rethink your 
architecture - this is not streaming anymore. Only because you receive events 
in a streamed fashion you don’t need to do all the processing in a streamed 
fashion.
Can you store the events in a file or a database and then do after 30 days 
batch processing on them?

Another aspect could be also to investigate why your source sends duplicated 
entries.

> On 27. Aug 2018, at 04:30, Ning Shi <nings...@gmail.com> wrote:
> 
> The application consumes from a single Kafka topic, deserializes the
> JSON payload into POJOs and use a big keyed window (30+ days) for
> deduplication, then emits the result for every single event to four
> other keyed windows for aggregation. It looks roughly like the
> following.
> 
> Source->KeyBy(A,B,C)
>           |
>           |                    -->KeyBy(A,B,C)->Hourly Window(sum)
>           v                    |->KeyBy(A,B,C)->Daily Window(sum)
> Big Window(sum, emit per event) -|
>                                |->KeyBy(D,E)->Hourly Window(sum)
>                                -->KeyBy(D,E)->Daily Window(sum)
> 
> The cardinality for the window keyed on (A,B,C) is high, could be in the
> millions. The values (A,B,C) are all strings.
> 
> I'm doing performance testing by letting the application consuming the
> past 7 days data from Kafka. However, the performance is not good and
> I'm having some trouble interpreting the results. All tests were done on
> AWS using i3.xlarge with 2 slots per TM. This was tested with one,
> three, and six TMs. Parallelism was set to the same as the total number
> of slots available, e.g. 6 for 3 nodes with 2 slots per TM.
> 
> - The application would always start at consuming ~500 messages/s from
>  Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I
>  noticed that the disk I/O would reduce noticeable when the performance
>  jumped.
> 
> - Regardless of the number of TMs used, it always peaked at ~5,000
>  messages/s and had the same behavior as described above.
> 
> - In the Flink UI, it always shows that the Source was back pressured by
>  the Big window when the performance was at ~500 messages/s, and no
>  back pressure at all once the performance reaches ~5,000 messages/s.
> 
> - I took some Flight Recorder recordings and it showed that the time
>  trigger Big window thread was always doing
>  SystemProcessingTimeService$TriggerTask.run(). Since I'm only
>  triggering the Big window by count of events, why would this be
>  running?
> 
> - Flight Recorder also showed that the Big window thread was either
>  doing RocksDB writes or gets most of the time when the performance was
>  low. I understand that it keeps the states in RocksDB, but I wasn't
>  expecting it to tank the performance like this.
> 
> - Flight Recorder showed that the hottest methods were all about Kryo
>  serialization.
> 
> - GC was ok, nothing longer than 20ms and there weren't a lot of them.
> 
> My questions are
> 
> - Why is the performance so bad and why didn't it scale as I increase
>  the number of TMs.
> 
> - Why would the performance jump suddenly after 20 minutes or so?
> 
> - I know the JSON and POJO serialization is not great. Could it be this
>  bad?
> 
> Any insights or guidance on how I can diagnose the issue further will be
> greatly appreciated.
> 
> Thanks,
> 
> Ning

Reply via email to