Hello, My ParDos in KafkaREAD stage is pretty much keeping with the timestamp of logs and able to read them real time without any lag, However, it seems sometimes it takes upto 3-4 mins for data to show up in ParDo(new Correlate()) after SessionWindow->followed by CoGroupByKey. I am definitely doing something wrong. Any suggestions?
Thanks and regards Mohil On Thu, Aug 6, 2020 at 12:42 AM Mohil Khare <[email protected]> wrote: > Hello All, > > I need to seek advice whether Session Windowing followed by CoGroupByKey > is a correct way to solve my use case or not and if YES, then whether I am > using it correctly or not. > Please note that I am using java sdk 2.19 on google dataflow > > I have two streams of data coming from two different kafka topics and I > need to correlate them using the common key present in both of them. I > expect all the logs for a key to arrive within 90 seconds in both topics > and hence I decided to use session window > > 1. Read data from kafka topic like following: > > PCollection<KV<MyKey, POJO1>> collection1 = > p > > .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read() > > .withBootstrapServers(servers) > > .withTopics(Arrays.asList(“topic1”)) > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(ByteArrayDeserializer.class) > > .withConsumerConfigUpdates(kafkaConsumerProperties) > > .withConsumerFactoryFn(consumerFactoryObj) > > .commitOffsetsInFinalize()) > > .apply("Applying_Fixed_Window", Window.<KafkaRecord<String, > byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > > .withAllowedLateness(Duration.standardSeconds(360)) > > .discardingFiredPanes()) > > .apply("Convert_KafkaRecord_To_PCollection<POJO>", > > ParDo.of(new ParseLogsPOJO1()))); > > > PCollection<KV<MyKey, POJO2>> collection2 = > p > > .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read() > > .withBootstrapServers(servers) > > .withTopics(Arrays.asList(“topic2”)) > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(ByteArrayDeserializer.class) > > .withConsumerConfigUpdates(kafkaConsumerProperties) > > .withConsumerFactoryFn(consumerFactoryObj) > > .commitOffsetsInFinalize()) > > .apply("Applying_Fixed_Window", Window.<KafkaRecord<String, > byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > > .withAllowedLateness(Duration.standardSeconds(360)) > > .discardingFiredPanes()) > > .apply("Convert_KafkaRecord_To_PCollection<POJO>", > > ParDo.of(new ParseLogsPOJO2()))); > > > 2. Put each of the above collections in a session window with gap period > 90 secs > > > PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 = > > Collection1 > > .apply("Applying_Session_Window", > > Window.<KV<MyKey, > POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90))) > > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) > > > .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); > > > PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 = > > Collection1 > > .apply("Applying_Session_Window", > > Window.<KV<MyKey, > POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90))) > > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) > > > .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); > > > 3. CoGroupByKey and get correlated logs > > > PCollection<CorrelatedPOJO> coGbkLogs = > > KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1) > > .and(“tag2”, sessionWindowedPOJO2) > > .apply("CoGroupByMyKey”, CoGroupByKey.create()) > > .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate()) > > > > > Is this a correct way to solve my use case? > > > Looking forward to hearing from someone soon. > > > Thanks and Regards > > Mohil >
