Hi Oliwer,

>From the description, Seems the state didn't be cleared, maybe you could
check how many {{windowState.clear()}} was triggered in
{{WindowOperator#processElement}}, and try to figure it out why the state
did not be cleared.

Best,
Congxian


Oliwer Kostera <o.kost...@adbglobal.com> 于2019年9月27日周五 下午4:14写道:

> Hi all,
>
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following
> definition:
>
> final SingleOutputStreamOperator<Message> processWindowFunctionStream =
>      
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>                 .process(new CustomProcessWindowFunction())
>                 .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>                 .name("Process window function");
>
> My checkpointing configuration is set to use RocksDB state backend with
> incremental checkpointing and EXACTLY_ONCE mode.
>
> In a runtime I noticed that even though data ingestion is static - same
> keys and frequency of messages the size of the process window operator
> keeps increasing. I tried to reproduce it with minimal similar setup here:
>  https://github.com/loliver1234/flink-process-window-function and was
> successful to do so.
>
> Testing conditions:
>
>    - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>    - RabbitMQ sink to collect messages
>    - Simple ProcessWindowFunction that only pass messages through
>    - Stream time characteristic set to TimeCharacteristic.ProcessingTime
>
> Testing scenario:
>
>    - Start flink job and check initial state size - State Size: 127 KB
>    - Start sending messages, 1000 same unique keys every 1s (they are not
>    falling into defined time window gap set to 100ms, each message should
>    create new window)
>    - State of the process window operator keeps increasing - after 1mln
>    messages state ended up to be around 2mb
>    - Stop sending messages and wait till rabbit queue is fully consumed
>    and few checkpoints go by
>    - Was expected to see state size to decrease to base value but it
>    stayed at 2mb
>    - Continue to send messages with the same keys and state kept
>    increasing trend.
>
> What I checked:
>
>    - Registration and deregistration of timers set for time windows -
>    each registration matched its deregistration
>    - Checked that in fact there are no window merges
>    - Tried custom Trigger disabling window merges and setting
>    onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>    behavior
>
> Tested with:
>
>    - Local Flink Mini Cluster running from IDE
>    - Flink ha standalone cluster  run in docker
>
> On staging environment, we noticed that state for that operator keeps
> increasing indefinitely, after some months reaching even 1,5gb for 100k
> unique keys
>
> With best regards
>
> Oliwer
> adbglobal.com <https://www.adbglobal.com>
>
> *This message (including any attachments) may contain confidential,
> proprietary, privileged and/or private information. The information is
> intended for the use of the individual or entity designated above. If you
> are not the intended recipient of this message, please notify the sender
> immediately, and delete the message and any attachments. Any disclosure,
> reproduction, distribution or other use of this message or any attachments
> by an individual or entity other than the intended recipient is STRICTLY
> PROHIBITED.*
>
> *Please note that ADB protects your privacy. Any personal information we
> collect from you is used in accordance with our Privacy Policy
> <https://www.adbglobal.com/privacy-policy/> and in compliance with
> applicable European data protection law (Regulation (EU) 2016/679, General
> Data Protection Regulation) and other statutory provisions. *
>

Reply via email to