Hello All,
I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow
I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window
1. Read data from kafka topic like following:
PCollection<KV<MyKey, POJO1>> collection1 =
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic1”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogsPOJO1())));
PCollection<KV<MyKey, POJO2>> collection2 =
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic2”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogsPOJO2())));
2. Put each of the above collections in a session window with gap period 90
secs
PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =
Collection1
.apply("Applying_Session_Window",
Window.<KV<MyKey,
POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes());
PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =
Collection1
.apply("Applying_Session_Window",
Window.<KV<MyKey,
POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes());
3. CoGroupByKey and get correlated logs
PCollection<CorrelatedPOJO> coGbkLogs =
KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)
.and(“tag2”, sessionWindowedPOJO2)
.apply("CoGroupByMyKey”, CoGroupByKey.create())
.apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())
Is this a correct way to solve my use case?
Looking forward to hearing from someone soon.
Thanks and Regards
Mohil