Assuming that null means that the accumulator was never created is not right especially if null is a valid terminal state while the initial accumulator value is non-null. This is uncommon but possible. Filed https://issues.apache.org/jira/browse/BEAM-11063.
+Aljoscha Krettek <aljos...@apache.org> Is this something you can take a look at? On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <andresgaragi...@gmail.com> wrote: > Hi all, > > I have this problem in a stream pipeline using the runner Apache Flink > 1.19. I want to do an upgrade to my job. I first end the job by using the > Flink API creating a savepoint, and then I start the new version by using > the Flink API passing the savepoint path. > > When the job ends two new records are created. The first one is OK but the > second one is an empty record. > > > My pipeline uses this window strategy: > > > *Window<KV<String, TaggedEvent>> window =* > > * Window.<KV<String, > TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))* > > * .triggering(AfterWatermark.pastEndOfWindow()* > > * > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))* > > * > .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))* > > * > .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))* > > * .accumulatingFiredPanes();* > > > I implemented a custom combiner, and I realized that the state of the > combiner is null in the second output. This line ( > https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507) > is evaluated to false, and then it creates an empty accumulator. > > > Is this the expected behavior? > > > Thanks >