Hello,

My ParDos in KafkaREAD stage is pretty much keeping with the timestamp of
logs and able to read them real time without any lag,
However, it seems sometimes it takes upto 3-4 mins for data to show up in
ParDo(new Correlate()) after SessionWindow->followed by CoGroupByKey.
I am definitely doing something wrong. Any suggestions?

Thanks and regards
Mohil



On Thu, Aug 6, 2020 at 12:42 AM Mohil Khare <[email protected]> wrote:

> Hello All,
>
> I need to seek advice whether Session Windowing followed by CoGroupByKey
> is a correct way to solve my use case or not and if YES, then whether I am
> using it correctly or not.
> Please note that I am using java sdk 2.19 on google dataflow
>
> I have two streams of data coming from two different kafka topics and I
> need to correlate them using the common key present in both of them. I
> expect all the logs for a key to arrive within 90 seconds in both topics
> and hence I decided to use session window
>
> 1. Read data from kafka topic like following:
>
> PCollection<KV<MyKey, POJO1>> collection1 =
>     p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic1”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogsPOJO1())));
>
>
> PCollection<KV<MyKey, POJO2>> collection2 =
>     p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic2”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogsPOJO2())));
>
>
> 2. Put each of the above collections in a session window with gap period
> 90 secs
>
>
>    PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =
>
>     Collection1
>
>         .apply("Applying_Session_Window",
>
>             Window.<KV<MyKey,
> POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
>      PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =
>
>     Collection1
>
>         .apply("Applying_Session_Window",
>
>             Window.<KV<MyKey,
> POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
>
>
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
>
>
> .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
>
>
> 3. CoGroupByKey and get correlated logs
>
>
>    PCollection<CorrelatedPOJO> coGbkLogs =
>
>     KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)
>
>         .and(“tag2”, sessionWindowedPOJO2)
>
>         .apply("CoGroupByMyKey”, CoGroupByKey.create())
>
>         .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())
>
>
>
>
>    Is this a correct way to solve my use case?
>
>
> Looking forward to hearing from someone soon.
>
>
> Thanks and Regards
>
> Mohil
>

Reply via email to