Hi there,

I'm using FlinkSQL to solve to do the job for me. Based on this
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html>,
configured the idle-state milliseconds.

*context*: FlinkSQL reads Kafka stream with no key and put to dynamic
table( sourceKafka). There's another static table( badips) loaded from file
and the join is performed from dynamic table on static like: SELECT
sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.source.ip=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

As it said in the docs, my query doesn't have a 'groupby' for the
idle-state to act upon and evict the untouched. So how do I manage to evict
the older once?

Questions:

   1. If it happens to be that the 'ip' acts as a key in my query for the
   eviction to work, how does Flink justify the Heap size grew to 80GB and
   crash?
   2. Is is that every query with a time windowed join, Flink SQL will
   automatically clear older records that have become irrelevant?


Thanks
Srikanth

Reply via email to