Hi,

This looks OK on the first sight.
Is it doing what you expect?

Fabian

Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for the information.
> I have been reading about it and doing the same as a part of flink job
> written in Java
>
> I am using proctime for both the tables. Can you please verify once the
> implementation of temporal tables
>
>
> here is the snippet.
> ----------------------------
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> ParameterTool params = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> Properties kafkaConsumerProperties = new Properties();
>
> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "cg54");
> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "latest");
>
>
> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> DataStream<String> badipStream = env.addSource(new
> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
> kafkaConsumerProperties));
>
> DataStream<String> badipStreamM =
> badipStream
> .map(new MapFunction<String, String>() {
>        private static final long serialVersionUID = -686775202L;
>        @Override
>        public String map(String value) throws Exception {
>         try {
>         String[] v = value.split("\\t");
>     if(v.length > 1) {
>     return v[0].toString();
>     } else
>     return "0.0.0.0";
>         } catch (Exception e) {
> System.err.println(e);
> return "0.0.0.0";
>         }
>        }
>    });
>
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
> r_proctime.proctime");*
>
> tableEnv.registerTable("BadIP", badipTable);
> TemporalTableFunction badIPTT =
> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
> tableEnv.registerFunction("BadIPTT", badIPTT);
>
>
>
> DataStream<ObjectNode> inKafkaStream = env
> .addSource(new FlinkKafkaConsumer<>("tests", new
> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
> DataStream<Tuple2<String,String>> inKafkaStreamM =
> inKafkaStream
> .rebalance()
> .filter(value -> value != null)
> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
>        private static final long serialVersionUID = -6867120202L;
>        @Override
>        public Tuple2<String,String> map(ObjectNode node) throws Exception
> {
>         try {
>         ObjectNode nodeValue = (ObjectNode) node.get("value");
>             return new Tuple2<>(nodeValue.get("source.ip").asText(),
> nodeValue.get("destination.ip").asText());
>         } catch (Exception e) {
> System.err.println(e);
> System.out.println(node);
> return null;
>         }
>        }
>    });
>
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
> destinationIp, k_proctime.proctime"*);
> tableEnv.registerTable("KafkaSource", kafkaSource);
> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>
> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
>  Types.STRING(),
>  Types.STRING());
>
> DataStream<Tuple2<String,String>> outStreamMalicious =
> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>
> Properties kafkaProducerProperties = new Properties();
>
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> ObjectMapper mapper = new ObjectMapper();
> DataStream<String> sinkStreamMaliciousData = outStreamMalicious
> .map(new MapFunction<Tuple2<String,String>,String>() {
> private static final long serialVersionUID = -6347120202L;
> @Override
> public String map(Tuple2<String,String> tuple) throws Exception {
> try {
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", tuple.f0);
> node.put("destination.ip", tuple.f1);
> return node.toString();
> } catch (Exception e) {
> System.err.println(e);
> System.out.println(tuple);
> return null;
> }
> }
> });
>
>
> sinkStreamMaliciousData.addSink(new
> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
> kafkaProducerProperties));
> env.execute("Flink List Matching");
> }
> -------------------------------------------------------
>
> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Nishant,
>>
>> You should model the query as a join with a time-versioned table [1].
>> The bad-ips table would be the time-time versioned table [2].
>> Since it is a time-versioned table, it could even be updated with new IPs.
>>
>> This type of join will only keep the time-versioned table (the bad-ips in
>> state) and not the other (high-volume) table.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>>
>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
>> nishantgupta1...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for your reply
>>> I have a continuous stream of kafka coming and static table of badips. I
>>> wanted to segregate records having bad ip.
>>>
>>> So therefore i was joining it. But with that 60 gb memory getting run out
>>>
>>> So i used below query.
>>> Can u please guide me in this regard
>>>
>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> The query that you wrote is not a time-windowed join.
>>>>
>>>> INSERT INTO sourceKafkaMalicious
>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>> sourceKafka.`source.ip`=badips.ip
>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>
>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing
>>>> time (or event time) attribute of badips.
>>>>
>>>> What exactly are you trying to achieve with the query?
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>>> nishantgupta1...@gmail.com>:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am running a query for Time Window Join as below
>>>>>
>>>>> INSERT INTO sourceKafkaMalicious
>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>> sourceKafka.`source.ip`=badips.ip
>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>
>>>>> Time windowed join, Flink SQL should automatically clear older
>>>>> records, Some how the query does not clear the heapspace and fails
>>>>> with error after sometime.
>>>>>
>>>>> Can you please let me know what could go wrong, or is it a issue
>>>>>
>>>>> Environment File chunks
>>>>>
>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> tables:
>>>>>   - name: sourceKafka
>>>>>     type: source-table
>>>>>     update-mode: append
>>>>>     connector:
>>>>>       type: kafka
>>>>>       version: "universal"
>>>>>       topic: test-data-flatten
>>>>>       properties:
>>>>>         - key: zookeeper.connect
>>>>>           value: x.x.x.x:2181
>>>>>         - key: bootstrap.servers
>>>>>           value: x.x.x.x:9092
>>>>>         - key: group.id
>>>>>           value: testgroup
>>>>>     format:
>>>>>       type: json
>>>>>       fail-on-missing-field: false
>>>>>       json-schema: >
>>>>>         {
>>>>>           type: 'object',
>>>>>           properties: {
>>>>>             'source.ip': {
>>>>>                type: 'string'
>>>>>             },
>>>>>             'source.port': {
>>>>>                type: 'string'
>>>>>             }
>>>>>           }
>>>>>         }
>>>>>       derive-schema: false
>>>>>     schema:
>>>>>       - name: ' source.ip '
>>>>>         type: VARCHAR
>>>>>       - name: 'source.port'
>>>>>         type: VARCHAR
>>>>>
>>>>>   - name: sourceKafkaMalicious
>>>>>     type: sink-table
>>>>>     update-mode: append
>>>>>     connector:
>>>>>       type: kafka
>>>>>       version: "universal"
>>>>>       topic: test-data-mal
>>>>>       properties:
>>>>>         - key: zookeeper.connect
>>>>>           value: x.x.x.x:2181
>>>>>         - key: bootstrap.servers
>>>>>           value: x.x.x.x:9092
>>>>>         - key: group.id
>>>>>           value: testgroupmal
>>>>>     format:
>>>>>       type: json
>>>>>       fail-on-missing-field: false
>>>>>       json-schema: >
>>>>>         {
>>>>>           type: 'object',
>>>>>           properties: {
>>>>>             'source.ip': {
>>>>>                type: 'string'
>>>>>             },
>>>>>             'source.port': {
>>>>>                type: 'string'
>>>>>             }
>>>>>           }
>>>>>         }
>>>>>       derive-schema: false
>>>>>     schema:
>>>>>       - name: ' source.ip '
>>>>>         type: VARCHAR
>>>>>       - name: 'source.port'
>>>>>         type: VARCHAR
>>>>>
>>>>>   - name: badips
>>>>>     type: source-table
>>>>>     #update-mode: append
>>>>>     connector:
>>>>>       type: filesystem
>>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>>     format:
>>>>>       type: csv
>>>>>       fields:
>>>>>         - name: ip
>>>>>           type: VARCHAR
>>>>>       comment-prefix: "#"
>>>>>     schema:
>>>>>       - name: ip
>>>>>         type: VARCHAR
>>>>>
>>>>> execution:
>>>>>   planner: blink
>>>>>   type: streaming
>>>>>   time-characteristic: event-time
>>>>>   periodic-watermarks-interval: 200
>>>>>   result-mode: table
>>>>>   max-table-result-rows: 1000000
>>>>>   parallelism: 3
>>>>>   max-parallelism: 128
>>>>>   min-idle-state-retention: 0
>>>>>   max-idle-state-retention: 0
>>>>>   restart-strategy:
>>>>>     type: fallback
>>>>>
>>>>> configuration:
>>>>>   table.optimizer.join-reorder-enabled: true
>>>>>   table.exec.spill-compression.enabled: true
>>>>>   table.exec.spill-compression.block-size: 128kb
>>>>>  Properties that describe the cluster to which table programs are
>>>>> submitted to.
>>>>>
>>>>> deployment:
>>>>>   response-timeout: 5000
>>>>>
>>>>>
>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>

Reply via email to