Hi, This looks OK on the first sight. Is it doing what you expect?
Fabian Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta < nishantgupta1...@gmail.com>: > Hi Fabian, > > Thanks for the information. > I have been reading about it and doing the same as a part of flink job > written in Java > > I am using proctime for both the tables. Can you please verify once the > implementation of temporal tables > > > here is the snippet. > ---------------------------- > public class StreamingJob { > > public static void main(String[] args) throws Exception { > > ParameterTool params = ParameterTool.fromArgs(args); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > > Properties kafkaConsumerProperties = new Properties(); > > kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); > kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "cg54"); > kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "latest"); > > > kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); > > kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); > > DataStream<String> badipStream = env.addSource(new > FlinkKafkaConsumer<>("badips", new SimpleStringSchema(), > kafkaConsumerProperties)); > > DataStream<String> badipStreamM = > badipStream > .map(new MapFunction<String, String>() { > private static final long serialVersionUID = -686775202L; > @Override > public String map(String value) throws Exception { > try { > String[] v = value.split("\\t"); > if(v.length > 1) { > return v[0].toString(); > } else > return "0.0.0.0"; > } catch (Exception e) { > System.err.println(e); > return "0.0.0.0"; > } > } > }); > > Table badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip, > r_proctime.proctime");* > > tableEnv.registerTable("BadIP", badipTable); > TemporalTableFunction badIPTT = > badipTable.createTemporalTableFunction("r_proctime", "bad_ip"); > tableEnv.registerFunction("BadIPTT", badIPTT); > > > > DataStream<ObjectNode> inKafkaStream = env > .addSource(new FlinkKafkaConsumer<>("tests", new > JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties)); > DataStream<Tuple2<String,String>> inKafkaStreamM = > inKafkaStream > .rebalance() > .filter(value -> value != null) > .map(new MapFunction<ObjectNode, Tuple2<String,String>>() { > private static final long serialVersionUID = -6867120202L; > @Override > public Tuple2<String,String> map(ObjectNode node) throws Exception > { > try { > ObjectNode nodeValue = (ObjectNode) node.get("value"); > return new Tuple2<>(nodeValue.get("source.ip").asText(), > nodeValue.get("destination.ip").asText()); > } catch (Exception e) { > System.err.println(e); > System.out.println(node); > return null; > } > } > }); > > Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp, > destinationIp, k_proctime.proctime"*); > tableEnv.registerTable("KafkaSource", kafkaSource); > * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, > K.destinationIp FROM KafkaSource AS K, LATERAL TABLE > (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");* > > TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>( > Types.STRING(), > Types.STRING()); > > DataStream<Tuple2<String,String>> outStreamMalicious = > tableEnv.toAppendStream(resultKafkaMalicious, tupleType); > > Properties kafkaProducerProperties = new Properties(); > > kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); > kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); > kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); > > ObjectMapper mapper = new ObjectMapper(); > DataStream<String> sinkStreamMaliciousData = outStreamMalicious > .map(new MapFunction<Tuple2<String,String>,String>() { > private static final long serialVersionUID = -6347120202L; > @Override > public String map(Tuple2<String,String> tuple) throws Exception { > try { > ObjectNode node = mapper.createObjectNode(); > node.put("source.ip", tuple.f0); > node.put("destination.ip", tuple.f1); > return node.toString(); > } catch (Exception e) { > System.err.println(e); > System.out.println(tuple); > return null; > } > } > }); > > > sinkStreamMaliciousData.addSink(new > FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(), > kafkaProducerProperties)); > env.execute("Flink List Matching"); > } > ------------------------------------------------------- > > On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Nishant, >> >> You should model the query as a join with a time-versioned table [1]. >> The bad-ips table would be the time-time versioned table [2]. >> Since it is a time-versioned table, it could even be updated with new IPs. >> >> This type of join will only keep the time-versioned table (the bad-ips in >> state) and not the other (high-volume) table. >> >> Best, >> Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html >> >> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta < >> nishantgupta1...@gmail.com>: >> >>> Hi Fabian, >>> >>> Thanks for your reply >>> I have a continuous stream of kafka coming and static table of badips. I >>> wanted to segregate records having bad ip. >>> >>> So therefore i was joining it. But with that 60 gb memory getting run out >>> >>> So i used below query. >>> Can u please guide me in this regard >>> >>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> The query that you wrote is not a time-windowed join. >>>> >>>> INSERT INTO sourceKafkaMalicious >>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>>> sourceKafka.`source.ip`=badips.ip >>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>>> >>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing >>>> time (or event time) attribute of badips. >>>> >>>> What exactly are you trying to achieve with the query? >>>> >>>> Best, Fabian >>>> >>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < >>>> nishantgupta1...@gmail.com>: >>>> >>>>> Hi Team, >>>>> >>>>> I am running a query for Time Window Join as below >>>>> >>>>> INSERT INTO sourceKafkaMalicious >>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON >>>>> sourceKafka.`source.ip`=badips.ip >>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - >>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; >>>>> >>>>> Time windowed join, Flink SQL should automatically clear older >>>>> records, Some how the query does not clear the heapspace and fails >>>>> with error after sometime. >>>>> >>>>> Can you please let me know what could go wrong, or is it a issue >>>>> >>>>> Environment File chunks >>>>> >>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>> tables: >>>>> - name: sourceKafka >>>>> type: source-table >>>>> update-mode: append >>>>> connector: >>>>> type: kafka >>>>> version: "universal" >>>>> topic: test-data-flatten >>>>> properties: >>>>> - key: zookeeper.connect >>>>> value: x.x.x.x:2181 >>>>> - key: bootstrap.servers >>>>> value: x.x.x.x:9092 >>>>> - key: group.id >>>>> value: testgroup >>>>> format: >>>>> type: json >>>>> fail-on-missing-field: false >>>>> json-schema: > >>>>> { >>>>> type: 'object', >>>>> properties: { >>>>> 'source.ip': { >>>>> type: 'string' >>>>> }, >>>>> 'source.port': { >>>>> type: 'string' >>>>> } >>>>> } >>>>> } >>>>> derive-schema: false >>>>> schema: >>>>> - name: ' source.ip ' >>>>> type: VARCHAR >>>>> - name: 'source.port' >>>>> type: VARCHAR >>>>> >>>>> - name: sourceKafkaMalicious >>>>> type: sink-table >>>>> update-mode: append >>>>> connector: >>>>> type: kafka >>>>> version: "universal" >>>>> topic: test-data-mal >>>>> properties: >>>>> - key: zookeeper.connect >>>>> value: x.x.x.x:2181 >>>>> - key: bootstrap.servers >>>>> value: x.x.x.x:9092 >>>>> - key: group.id >>>>> value: testgroupmal >>>>> format: >>>>> type: json >>>>> fail-on-missing-field: false >>>>> json-schema: > >>>>> { >>>>> type: 'object', >>>>> properties: { >>>>> 'source.ip': { >>>>> type: 'string' >>>>> }, >>>>> 'source.port': { >>>>> type: 'string' >>>>> } >>>>> } >>>>> } >>>>> derive-schema: false >>>>> schema: >>>>> - name: ' source.ip ' >>>>> type: VARCHAR >>>>> - name: 'source.port' >>>>> type: VARCHAR >>>>> >>>>> - name: badips >>>>> type: source-table >>>>> #update-mode: append >>>>> connector: >>>>> type: filesystem >>>>> path: "/home/cyanadmin/ipsum/levels/badips.csv" >>>>> format: >>>>> type: csv >>>>> fields: >>>>> - name: ip >>>>> type: VARCHAR >>>>> comment-prefix: "#" >>>>> schema: >>>>> - name: ip >>>>> type: VARCHAR >>>>> >>>>> execution: >>>>> planner: blink >>>>> type: streaming >>>>> time-characteristic: event-time >>>>> periodic-watermarks-interval: 200 >>>>> result-mode: table >>>>> max-table-result-rows: 1000000 >>>>> parallelism: 3 >>>>> max-parallelism: 128 >>>>> min-idle-state-retention: 0 >>>>> max-idle-state-retention: 0 >>>>> restart-strategy: >>>>> type: fallback >>>>> >>>>> configuration: >>>>> table.optimizer.join-reorder-enabled: true >>>>> table.exec.spill-compression.enabled: true >>>>> table.exec.spill-compression.block-size: 128kb >>>>> Properties that describe the cluster to which table programs are >>>>> submitted to. >>>>> >>>>> deployment: >>>>> response-timeout: 5000 >>>>> >>>>> >>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>> >>>>