Hi,

The query that you wrote is not a time-windowed join.

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

The problem is the use of CURRENT_TIMESTAMP instead of a processing time
(or event time) attribute of badips.

What exactly are you trying to achieve with the query?

Best, Fabian

Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Team,
>
> I am running a query for Time Window Join as below
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> Time windowed join, Flink SQL should automatically clear older records, Some
> how the query does not clear the heapspace and fails with error after
> sometime.
>
> Can you please let me know what could go wrong, or is it a issue
>
> Environment File chunks
>
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
> tables:
>   - name: sourceKafka
>     type: source-table
>     update-mode: append
>     connector:
>       type: kafka
>       version: "universal"
>       topic: test-data-flatten
>       properties:
>         - key: zookeeper.connect
>           value: x.x.x.x:2181
>         - key: bootstrap.servers
>           value: x.x.x.x:9092
>         - key: group.id
>           value: testgroup
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'source.ip': {
>                type: 'string'
>             },
>             'source.port': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>     schema:
>       - name: ' source.ip '
>         type: VARCHAR
>       - name: 'source.port'
>         type: VARCHAR
>
>   - name: sourceKafkaMalicious
>     type: sink-table
>     update-mode: append
>     connector:
>       type: kafka
>       version: "universal"
>       topic: test-data-mal
>       properties:
>         - key: zookeeper.connect
>           value: x.x.x.x:2181
>         - key: bootstrap.servers
>           value: x.x.x.x:9092
>         - key: group.id
>           value: testgroupmal
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'source.ip': {
>                type: 'string'
>             },
>             'source.port': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>     schema:
>       - name: ' source.ip '
>         type: VARCHAR
>       - name: 'source.port'
>         type: VARCHAR
>
>   - name: badips
>     type: source-table
>     #update-mode: append
>     connector:
>       type: filesystem
>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>     format:
>       type: csv
>       fields:
>         - name: ip
>           type: VARCHAR
>       comment-prefix: "#"
>     schema:
>       - name: ip
>         type: VARCHAR
>
> execution:
>   planner: blink
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 1000000
>   parallelism: 3
>   max-parallelism: 128
>   min-idle-state-retention: 0
>   max-idle-state-retention: 0
>   restart-strategy:
>     type: fallback
>
> configuration:
>   table.optimizer.join-reorder-enabled: true
>   table.exec.spill-compression.enabled: true
>   table.exec.spill-compression.block-size: 128kb
>  Properties that describe the cluster to which table programs are
> submitted to.
>
> deployment:
>   response-timeout: 5000
>
>
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>

Reply via email to