Assuming that null means that the accumulator was never created is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but possible. Filed
https://issues.apache.org/jira/browse/BEAM-11063.

+Aljoscha Krettek <aljos...@apache.org> Is this something you can take a
look at?

On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <andresgaragi...@gmail.com>
wrote:

> Hi all,
>
> I have this problem in a stream pipeline using the runner Apache Flink
> 1.19. I want to do an upgrade to my job. I first end the job by using the
> Flink API creating a savepoint, and then I start the new version by using
> the Flink API passing the savepoint path.
>
> When the job ends two new records are created. The first one is OK but the
> second one is an empty record.
>
>
> My pipeline uses this window strategy:
>
>
> *Window<KV<String, TaggedEvent>> window =*
>
> *    Window.<KV<String,
> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>
> *        .triggering(AfterWatermark.pastEndOfWindow()*
>
> *
>  
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
>
> *
>  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>
> *
> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
>
> *        .accumulatingFiredPanes();*
>
>
> I implemented a custom combiner, and I realized that the state of the
> combiner is null in the second output. This line (
> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507)
> is evaluated to false, and then it creates an empty accumulator.
>
>
> Is this the expected behavior?
>
>
> Thanks
>

Reply via email to