Hi Nishant, You should model the query as a join with a time-versioned table [1]. The bad-ips table would be the time-time versioned table [2]. Since it is a time-versioned table, it could even be updated with new IPs.
This type of join will only keep the time-versioned table (the bad-ips in state) and not the other (high-volume) table. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta < nishantgupta1...@gmail.com>: > Hi Fabian, > > Thanks for your reply > I have a continuous stream of kafka coming and static table of badips. I > wanted to segregate records having bad ip. > > So therefore i was joining it. But with that 60 gb memory getting run out > > So i used below query. > Can u please guide me in this regard > > On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> The query that you wrote is not a time-windowed join. >> >> INSERT INTO sourceKafkaMalicious >> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >> sourceKafka.`source.ip`=badips.ip >> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL >> '15' MINUTE AND CURRENT_TIMESTAMP; >> >> The problem is the use of CURRENT_TIMESTAMP instead of a processing time >> (or event time) attribute of badips. >> >> What exactly are you trying to achieve with the query? >> >> Best, Fabian >> >> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < >> nishantgupta1...@gmail.com>: >> >>> Hi Team, >>> >>> I am running a query for Time Window Join as below >>> >>> INSERT INTO sourceKafkaMalicious >>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>> sourceKafka.`source.ip`=badips.ip >>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>> >>> Time windowed join, Flink SQL should automatically clear older records, Some >>> how the query does not clear the heapspace and fails with error after >>> sometime. >>> >>> Can you please let me know what could go wrong, or is it a issue >>> >>> Environment File chunks >>> >>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>> tables: >>> - name: sourceKafka >>> type: source-table >>> update-mode: append >>> connector: >>> type: kafka >>> version: "universal" >>> topic: test-data-flatten >>> properties: >>> - key: zookeeper.connect >>> value: x.x.x.x:2181 >>> - key: bootstrap.servers >>> value: x.x.x.x:9092 >>> - key: group.id >>> value: testgroup >>> format: >>> type: json >>> fail-on-missing-field: false >>> json-schema: > >>> { >>> type: 'object', >>> properties: { >>> 'source.ip': { >>> type: 'string' >>> }, >>> 'source.port': { >>> type: 'string' >>> } >>> } >>> } >>> derive-schema: false >>> schema: >>> - name: ' source.ip ' >>> type: VARCHAR >>> - name: 'source.port' >>> type: VARCHAR >>> >>> - name: sourceKafkaMalicious >>> type: sink-table >>> update-mode: append >>> connector: >>> type: kafka >>> version: "universal" >>> topic: test-data-mal >>> properties: >>> - key: zookeeper.connect >>> value: x.x.x.x:2181 >>> - key: bootstrap.servers >>> value: x.x.x.x:9092 >>> - key: group.id >>> value: testgroupmal >>> format: >>> type: json >>> fail-on-missing-field: false >>> json-schema: > >>> { >>> type: 'object', >>> properties: { >>> 'source.ip': { >>> type: 'string' >>> }, >>> 'source.port': { >>> type: 'string' >>> } >>> } >>> } >>> derive-schema: false >>> schema: >>> - name: ' source.ip ' >>> type: VARCHAR >>> - name: 'source.port' >>> type: VARCHAR >>> >>> - name: badips >>> type: source-table >>> #update-mode: append >>> connector: >>> type: filesystem >>> path: "/home/cyanadmin/ipsum/levels/badips.csv" >>> format: >>> type: csv >>> fields: >>> - name: ip >>> type: VARCHAR >>> comment-prefix: "#" >>> schema: >>> - name: ip >>> type: VARCHAR >>> >>> execution: >>> planner: blink >>> type: streaming >>> time-characteristic: event-time >>> periodic-watermarks-interval: 200 >>> result-mode: table >>> max-table-result-rows: 1000000 >>> parallelism: 3 >>> max-parallelism: 128 >>> min-idle-state-retention: 0 >>> max-idle-state-retention: 0 >>> restart-strategy: >>> type: fallback >>> >>> configuration: >>> table.optimizer.join-reorder-enabled: true >>> table.exec.spill-compression.enabled: true >>> table.exec.spill-compression.block-size: 128kb >>> Properties that describe the cluster to which table programs are >>> submitted to. >>> >>> deployment: >>> response-timeout: 5000 >>> >>> >>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>> >>