Hi, The query that you wrote is not a time-windowed join.
INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; The problem is the use of CURRENT_TIMESTAMP instead of a processing time (or event time) attribute of badips. What exactly are you trying to achieve with the query? Best, Fabian Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < nishantgupta1...@gmail.com>: > Hi Team, > > I am running a query for Time Window Join as below > > INSERT INTO sourceKafkaMalicious > SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON > sourceKafka.`source.ip`=badips.ip > WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL > '15' MINUTE AND CURRENT_TIMESTAMP; > > Time windowed join, Flink SQL should automatically clear older records, Some > how the query does not clear the heapspace and fails with error after > sometime. > > Can you please let me know what could go wrong, or is it a issue > > Environment File chunks > > -------------------------------------------------------------------------------------------------------------------------------------------------------------- > tables: > - name: sourceKafka > type: source-table > update-mode: append > connector: > type: kafka > version: "universal" > topic: test-data-flatten > properties: > - key: zookeeper.connect > value: x.x.x.x:2181 > - key: bootstrap.servers > value: x.x.x.x:9092 > - key: group.id > value: testgroup > format: > type: json > fail-on-missing-field: false > json-schema: > > { > type: 'object', > properties: { > 'source.ip': { > type: 'string' > }, > 'source.port': { > type: 'string' > } > } > } > derive-schema: false > schema: > - name: ' source.ip ' > type: VARCHAR > - name: 'source.port' > type: VARCHAR > > - name: sourceKafkaMalicious > type: sink-table > update-mode: append > connector: > type: kafka > version: "universal" > topic: test-data-mal > properties: > - key: zookeeper.connect > value: x.x.x.x:2181 > - key: bootstrap.servers > value: x.x.x.x:9092 > - key: group.id > value: testgroupmal > format: > type: json > fail-on-missing-field: false > json-schema: > > { > type: 'object', > properties: { > 'source.ip': { > type: 'string' > }, > 'source.port': { > type: 'string' > } > } > } > derive-schema: false > schema: > - name: ' source.ip ' > type: VARCHAR > - name: 'source.port' > type: VARCHAR > > - name: badips > type: source-table > #update-mode: append > connector: > type: filesystem > path: "/home/cyanadmin/ipsum/levels/badips.csv" > format: > type: csv > fields: > - name: ip > type: VARCHAR > comment-prefix: "#" > schema: > - name: ip > type: VARCHAR > > execution: > planner: blink > type: streaming > time-characteristic: event-time > periodic-watermarks-interval: 200 > result-mode: table > max-table-result-rows: 1000000 > parallelism: 3 > max-parallelism: 128 > min-idle-state-retention: 0 > max-idle-state-retention: 0 > restart-strategy: > type: fallback > > configuration: > table.optimizer.join-reorder-enabled: true > table.exec.spill-compression.enabled: true > table.exec.spill-compression.block-size: 128kb > Properties that describe the cluster to which table programs are > submitted to. > > deployment: > response-timeout: 5000 > > > -------------------------------------------------------------------------------------------------------------------------------------------------------------- >