After a GroupByKey, a (single) timestamp needs to be assigned to the
full KV<K, Iterable<V>> element. By default the timestamp chosen is
the end of the window, which in the case of the global window is a
timestamp as far into the future as can be represented. (Python prints
these as MAX_TIMESTAMP rather than an exact time, perhaps we should do
similar for Java).
You can use the withTimestampCombiner [1] method to adjust this
behavior. The currently supported options are end of window (the
default) earliest (meaning the timestamp of the grouped result is the
timestamp of the earliest element in that group) or latest (analogous
to earliest).
pcoll.apply(Window.into(...).withTimestampCombiner(TimestampCombiner.LATEST)
Hopefully that helps,
Robert
[1]
https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/windowing/Window.html#withTimestampCombiner-org.apache.beam.sdk.transforms.windowing.TimestampCombiner-
On Sat, Dec 1, 2018 at 10:49 PM <[email protected]> wrote:
>
> Hi,
>
> I was trying out apache Beam, and got unexpected output for the following
> program:
>
> ============
>
> StreamingOptions options =
> PipelineOptionsFactory.create().as(StreamingOptions.class);
> options.setStreaming(true);
> Pipeline p = Pipeline.create(options);
>
> PCollection<KV<String, String>> kvpCollection = p
> .apply(Utils.readKafka(
> "inputs",
> StringDeserializer.class,
> StringDeserializer.class,
> "group-id"))
> .apply(Window.<KV<String, String>>into(
> new GlobalWindows())
> .triggering(AfterPane.elementCountAtLeast(1))
> .discardingFiredPanes());
>
> kvpCollection.apply(ParDo.of(new DoFn<KV<String, String>, KV<String,
> Instant>>() {
> @ProcessElement
> public void processElement(@Element KV<String, String> input,
> ProcessContext context,
> OutputReceiver<KV<String, Instant>>
> out) {
> Instant timestamp = context.timestamp();
> out.output(KV.of(input.getValue(), timestamp));
> }
> }))
> .apply(GroupByKey.create())
> .apply(ParDo.of(new DoFn<KV<String, Iterable<Instant>>, Void>() {
> @ProcessElement
> public void processElement(@Element KV<String, Iterable<Instant>>
> input,
> ProcessContext context,
> OutputReceiver<Void> out) {
> log.info("processElement input: {}, timestamp: {}", input,
> context.timestamp());
> }
> }))
> ;
>
> p.run().waitUntilFinish();
>
> ===================
>
> The output is as follows:
>
> processElement input: KV{some-input, [2018-12-01T21:13:55.621Z]}, timestamp:
> 294247-01-09T04:00:54.775Z
>
> (debugging gets the same results)
>
> So it looks like context.timestamp() is returning garbage after the
> GroupByKey transform.
> I expected the timestamp from before the GroupByKey to be identical to the
> one after the GroupByKey.
>
> Is this expected? Am I doing something wrong (related to the continuously
> triggering global window perhaps)?
> Does it indicate that watermarks (and corresponding timers) will not work
> properly after grouping by key?
>
> I used the java beam direct runner version 2.8.0.
>
> Thanks in advance, kind regards,
>
> Jan