Hi,

I was trying out apache Beam, and got unexpected output for the following
program:

============

        StreamingOptions options =
PipelineOptionsFactory.create().as(StreamingOptions.class);
        options.setStreaming(true);
        Pipeline p = Pipeline.create(options);

        PCollection<KV<String, String>> kvpCollection = p
                .apply(Utils.readKafka(
                        "inputs",
                        StringDeserializer.class,
                        StringDeserializer.class,
                        "group-id"))
                .apply(Window.<KV<String, String>>into(
                        new GlobalWindows())
                        .triggering(AfterPane.elementCountAtLeast(1))
                        .discardingFiredPanes());

        kvpCollection.apply(ParDo.of(new DoFn<KV<String, String>,
KV<String, Instant>>() {
            @ProcessElement
            public void processElement(@Element KV<String, String> input,
                                       ProcessContext context,
                                       OutputReceiver<KV<String, Instant>>
out) {
                Instant timestamp = context.timestamp();
                out.output(KV.of(input.getValue(), timestamp));
            }
        }))
                .apply(GroupByKey.create())
        .apply(ParDo.of(new DoFn<KV<String, Iterable<Instant>>, Void>() {
            @ProcessElement
            public void processElement(@Element KV<String,
Iterable<Instant>> input,
                                       ProcessContext context,
                                       OutputReceiver<Void> out) {
                log.info("processElement input: {}, timestamp: {}", input,
context.timestamp());
            }
        }))
        ;

        p.run().waitUntilFinish();

===================

The output is as follows:

processElement input: KV{some-input, [2018-12-01T21:13:55.621Z]},
timestamp: 294247-01-09T04:00:54.775Z

(debugging gets the same results)

So it looks like context.timestamp() is returning garbage after the
GroupByKey transform.
I expected the timestamp from before the GroupByKey to be identical to the
one after the GroupByKey.

Is this expected? Am I doing something wrong (related to the continuously
triggering global window perhaps)?
Does it indicate that watermarks (and corresponding timers) will not work
properly after grouping by key?

I used the java beam direct runner version 2.8.0.

Thanks in advance, kind regards,

Jan

Reply via email to