Thanks Luke, Aljoscha

Let me know if I can help you to reproduce the problem.
In my case the state is never set to null but I think that it becomes null
while the job is stopping. Once I run the job again from the savepoint, the
state is recovered normally.

Let's show this with an example:

t0: Add input 1 => accu state [1] => output [1]
t1: Add input 2 => acu state [1,2] => output [1,2]
t2: stop job with savepoint => output [1,2,3] and *output [] *
t3: run job from savepoint => acu state [1,2] => no output
t4: Add input 3 => acu state [1,2,3] => [1,2,3]

Regards

On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]>
wrote:

> I'll take a look.
>
> On 14.10.20 18:49, Luke Cwik wrote:
> > Assuming that null means that the accumulator was never created is not
> > right especially if null is a valid terminal state while the
> > initial accumulator value is non-null. This is uncommon but possible.
> Filed
> > https://issues.apache.org/jira/browse/BEAM-11063.
> >
> > +Aljoscha Krettek <[email protected]> Is this something you can take a
> > look at?
> >
> > On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
> [email protected]>
> > wrote:
> >
> >> Hi all,
> >>
> >> I have this problem in a stream pipeline using the runner Apache Flink
> >> 1.19. I want to do an upgrade to my job. I first end the job by using
> the
> >> Flink API creating a savepoint, and then I start the new version by
> using
> >> the Flink API passing the savepoint path.
> >>
> >> When the job ends two new records are created. The first one is OK but
> the
> >> second one is an empty record.
> >>
> >>
> >> My pipeline uses this window strategy:
> >>
> >>
> >> *Window<KV<String, TaggedEvent>> window =*
> >>
> >> *    Window.<KV<String,
> >> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
> >>
> >> *        .triggering(AfterWatermark.pastEndOfWindow()*
> >>
> >> *
> >>
>  
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
> >>
> >> *
> >>   .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
> >>
> >> *
> >>
> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
> >>
> >> *        .accumulatingFiredPanes();*
> >>
> >>
> >> I implemented a custom combiner, and I realized that the state of the
> >> combiner is null in the second output. This line (
> >>
> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
> )
> >> is evaluated to false, and then it creates an empty accumulator.
> >>
> >>
> >> Is this the expected behavior?
> >>
> >>
> >> Thanks
> >>
> >
>
>

Reply via email to