Thanks Luke, Aljoscha Let me know if I can help you to reproduce the problem. In my case the state is never set to null but I think that it becomes null while the job is stopping. Once I run the job again from the savepoint, the state is recovered normally.
Let's show this with an example: t0: Add input 1 => accu state [1] => output [1] t1: Add input 2 => acu state [1,2] => output [1,2] t2: stop job with savepoint => output [1,2,3] and *output [] * t3: run job from savepoint => acu state [1,2] => no output t4: Add input 3 => acu state [1,2,3] => [1,2,3] Regards On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]> wrote: > I'll take a look. > > On 14.10.20 18:49, Luke Cwik wrote: > > Assuming that null means that the accumulator was never created is not > > right especially if null is a valid terminal state while the > > initial accumulator value is non-null. This is uncommon but possible. > Filed > > https://issues.apache.org/jira/browse/BEAM-11063. > > > > +Aljoscha Krettek <[email protected]> Is this something you can take a > > look at? > > > > On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola < > [email protected]> > > wrote: > > > >> Hi all, > >> > >> I have this problem in a stream pipeline using the runner Apache Flink > >> 1.19. I want to do an upgrade to my job. I first end the job by using > the > >> Flink API creating a savepoint, and then I start the new version by > using > >> the Flink API passing the savepoint path. > >> > >> When the job ends two new records are created. The first one is OK but > the > >> second one is an empty record. > >> > >> > >> My pipeline uses this window strategy: > >> > >> > >> *Window<KV<String, TaggedEvent>> window =* > >> > >> * Window.<KV<String, > >> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))* > >> > >> * .triggering(AfterWatermark.pastEndOfWindow()* > >> > >> * > >> > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))* > >> > >> * > >> .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))* > >> > >> * > >> > .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))* > >> > >> * .accumulatingFiredPanes();* > >> > >> > >> I implemented a custom combiner, and I realized that the state of the > >> combiner is null in the second output. This line ( > >> > https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507 > ) > >> is evaluated to false, and then it creates an empty accumulator. > >> > >> > >> Is this the expected behavior? > >> > >> > >> Thanks > >> > > > >
