Hi Nishant,

You should model the query as a join with a time-versioned table [1].
The bad-ips table would be the time-time versioned table [2].
Since it is a time-versioned table, it could even be updated with new IPs.

This type of join will only keep the time-versioned table (the bad-ips in
state) and not the other (high-volume) table.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html

Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for your reply
> I have a continuous stream of kafka coming and static table of badips. I
> wanted to segregate records having bad ip.
>
> So therefore i was joining it. But with that 60 gb memory getting run out
>
> So i used below query.
> Can u please guide me in this regard
>
> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> The query that you wrote is not a time-windowed join.
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>> (or event time) attribute of badips.
>>
>> What exactly are you trying to achieve with the query?
>>
>> Best, Fabian
>>
>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>> nishantgupta1...@gmail.com>:
>>
>>> Hi Team,
>>>
>>> I am running a query for Time Window Join as below
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> Time windowed join, Flink SQL should automatically clear older records, Some
>>> how the query does not clear the heapspace and fails with error after
>>> sometime.
>>>
>>> Can you please let me know what could go wrong, or is it a issue
>>>
>>> Environment File chunks
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>> tables:
>>>   - name: sourceKafka
>>>     type: source-table
>>>     update-mode: append
>>>     connector:
>>>       type: kafka
>>>       version: "universal"
>>>       topic: test-data-flatten
>>>       properties:
>>>         - key: zookeeper.connect
>>>           value: x.x.x.x:2181
>>>         - key: bootstrap.servers
>>>           value: x.x.x.x:9092
>>>         - key: group.id
>>>           value: testgroup
>>>     format:
>>>       type: json
>>>       fail-on-missing-field: false
>>>       json-schema: >
>>>         {
>>>           type: 'object',
>>>           properties: {
>>>             'source.ip': {
>>>                type: 'string'
>>>             },
>>>             'source.port': {
>>>                type: 'string'
>>>             }
>>>           }
>>>         }
>>>       derive-schema: false
>>>     schema:
>>>       - name: ' source.ip '
>>>         type: VARCHAR
>>>       - name: 'source.port'
>>>         type: VARCHAR
>>>
>>>   - name: sourceKafkaMalicious
>>>     type: sink-table
>>>     update-mode: append
>>>     connector:
>>>       type: kafka
>>>       version: "universal"
>>>       topic: test-data-mal
>>>       properties:
>>>         - key: zookeeper.connect
>>>           value: x.x.x.x:2181
>>>         - key: bootstrap.servers
>>>           value: x.x.x.x:9092
>>>         - key: group.id
>>>           value: testgroupmal
>>>     format:
>>>       type: json
>>>       fail-on-missing-field: false
>>>       json-schema: >
>>>         {
>>>           type: 'object',
>>>           properties: {
>>>             'source.ip': {
>>>                type: 'string'
>>>             },
>>>             'source.port': {
>>>                type: 'string'
>>>             }
>>>           }
>>>         }
>>>       derive-schema: false
>>>     schema:
>>>       - name: ' source.ip '
>>>         type: VARCHAR
>>>       - name: 'source.port'
>>>         type: VARCHAR
>>>
>>>   - name: badips
>>>     type: source-table
>>>     #update-mode: append
>>>     connector:
>>>       type: filesystem
>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>     format:
>>>       type: csv
>>>       fields:
>>>         - name: ip
>>>           type: VARCHAR
>>>       comment-prefix: "#"
>>>     schema:
>>>       - name: ip
>>>         type: VARCHAR
>>>
>>> execution:
>>>   planner: blink
>>>   type: streaming
>>>   time-characteristic: event-time
>>>   periodic-watermarks-interval: 200
>>>   result-mode: table
>>>   max-table-result-rows: 1000000
>>>   parallelism: 3
>>>   max-parallelism: 128
>>>   min-idle-state-retention: 0
>>>   max-idle-state-retention: 0
>>>   restart-strategy:
>>>     type: fallback
>>>
>>> configuration:
>>>   table.optimizer.join-reorder-enabled: true
>>>   table.exec.spill-compression.enabled: true
>>>   table.exec.spill-compression.block-size: 128kb
>>>  Properties that describe the cluster to which table programs are
>>> submitted to.
>>>
>>> deployment:
>>>   response-timeout: 5000
>>>
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>

Reply via email to