Hi,
I am exploring sessions windowing in apache beam. I have created a pipeline
to know the window start time and window end time of the elements emitted
from GroupByKey, which groups elements to which sessions window was
applied.
I got the Exception:
Exception in thread "main"
java.lang.IllegalArgumentException: public void
main.SlidingWindowX$2.processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
unable to provide window -- expected window type from parameter
(org.apache.beam.sdk.transforms.windowing.IntervalWindow) is not a
supertype of actual window type assigned by windowing (W)
I came to know that when GroupByKey is applied on a PCollection to which
sessions window is applied, the output PCollection emitted from GroupByKey
doesn't have any WindowingStrategy. Is this Correct?
What should be the window type of such elements? How can I know the start
time and end time of the Session window(merged windows)?
If I apply a GroupByKey again on the result from 1st GroupByKey, I get the
Exception:
Exception in thread "main" java.lang.IllegalStateException:
GroupByKey must have a valid Window merge function. Invalid because:
WindowFn has already been consumed by previous GroupByKey
So, should a new window be applied, before I do this?
I am obviously missing a crucial point regarding the output PCollection
emitted from Aggregate Transforms. Can you point me to any documentation
(or) code that explains this behaviour?
After going through the code, I understood that, when Sessions window is
applied, each element is assigned an Interval Window with start_time =
element_timestamp, end_time = element_timestamp + gap. And when an
Aggregation Transform is called, MergeContext is Created for each Key and
the windows are merged. Is it possible to get the MergeContext after
applying Aggregation Transforms?
Can you point me to the design doc for this behaviour?
Pipeline Steps:
1) Read from Kafka using KafkaIO
PCollection<KafkaRecord<String, String>> pcollectIO =
p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092").withTopic("sessionWindowtopic")
.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
2) Convert KafkaRecord to KV
PCollection<KV<String, String>> pcollectKV = pcollectIO
.apply(ParDo.of(new DoFn<KafkaRecord<String, String>,
KV<String, String>>() {
@ProcessElement
public void processElement(@Element KafkaRecord<String,
String> record,
OutputReceiver<KV<String, String>> r) {
r.output(record.getKV());
}
}));
3) Apply sessions window
PCollection<KV<String, String>> pcollectW2 = pcollectKV
.apply(Window.<KV<String, String>>into(
Sessions.withGapDuration(Duration.standardSeconds(5))));
4) 1st GroupByKey
PCollection<KV<String, Iterable<String>>> pcollectAfterGroupBy =
pcollectW2.apply(GroupByKey.<String, String>create());
5) ParDo to output the window start time and the window end time of each
element of the PCollection emitted from GroupByKey.
pcollectAfterGroupBy.apply(ParDo.of(new DoFn<KV<String,
Iterable<String>>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, IntervalWindow b) {
System.out.println(b.start());
System.out.println(b.end());
}
6) 2nd GroupByKey (used just for exploring sessions window)
PCollection<KV<String, Iterable<Iterable<String>>>>
pcollectCascaded = pcollectAfterGroupBy.apply(GroupByKey.<String,
Iterable<String>>create());
Thanks,
Rahul