Hi,
I was trying out apache Beam, and got unexpected output for the following
program:
============
StreamingOptions options =
PipelineOptionsFactory.create().as(StreamingOptions.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<KV<String, String>> kvpCollection = p
.apply(Utils.readKafka(
"inputs",
StringDeserializer.class,
StringDeserializer.class,
"group-id"))
.apply(Window.<KV<String, String>>into(
new GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(1))
.discardingFiredPanes());
kvpCollection.apply(ParDo.of(new DoFn<KV<String, String>,
KV<String, Instant>>() {
@ProcessElement
public void processElement(@Element KV<String, String> input,
ProcessContext context,
OutputReceiver<KV<String, Instant>>
out) {
Instant timestamp = context.timestamp();
out.output(KV.of(input.getValue(), timestamp));
}
}))
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn<KV<String, Iterable<Instant>>, Void>() {
@ProcessElement
public void processElement(@Element KV<String,
Iterable<Instant>> input,
ProcessContext context,
OutputReceiver<Void> out) {
log.info("processElement input: {}, timestamp: {}", input,
context.timestamp());
}
}))
;
p.run().waitUntilFinish();
===================
The output is as follows:
processElement input: KV{some-input, [2018-12-01T21:13:55.621Z]},
timestamp: 294247-01-09T04:00:54.775Z
(debugging gets the same results)
So it looks like context.timestamp() is returning garbage after the
GroupByKey transform.
I expected the timestamp from before the GroupByKey to be identical to the
one after the GroupByKey.
Is this expected? Am I doing something wrong (related to the continuously
triggering global window perhaps)?
Does it indicate that watermarks (and corresponding timers) will not work
properly after grouping by key?
I used the java beam direct runner version 2.8.0.
Thanks in advance, kind regards,
Jan