Hi all,

I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator<Message> processWindowFunctionStream =
     
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
                .process(new CustomProcessWindowFunction())
                .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
                .name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or entity other 
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect 
from you is used in accordance with our Privacy 
Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with 
applicable European data protection law (Regulation (EU) 2016/679, General Data 
Protection Regulation) and other statutory provisions.

Reply via email to