I'll take a look.

On 14.10.20 18:49, Luke Cwik wrote:
Assuming that null means that the accumulator was never created is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but possible. Filed
https://issues.apache.org/jira/browse/BEAM-11063.

+Aljoscha Krettek <[email protected]> Is this something you can take a
look at?

On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <[email protected]>
wrote:

Hi all,

I have this problem in a stream pipeline using the runner Apache Flink
1.19. I want to do an upgrade to my job. I first end the job by using the
Flink API creating a savepoint, and then I start the new version by using
the Flink API passing the savepoint path.

When the job ends two new records are created. The first one is OK but the
second one is an empty record.


My pipeline uses this window strategy:


*Window<KV<String, TaggedEvent>> window =*

*    Window.<KV<String,
TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*

*        .triggering(AfterWatermark.pastEndOfWindow()*

*
  
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*

*
  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*

*
.withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*

*        .accumulatingFiredPanes();*


I implemented a custom combiner, and I realized that the state of the
combiner is null in the second output. This line (
https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507)
is evaluated to false, and then it creates an empty accumulator.


Is this the expected behavior?


Thanks



Reply via email to