There's multiple things that come together here, I'm afraid:

1. There is additional output when stopping with a savepoint. It would be good to know where that comes from.

2. The state internals implementation does in fact seem wrong. We don't differentiate the cases of "never created an accumulator" and "my accumulator is null".

@Andrés, could you put breakpoints in your Combiner implementation and see when that second output happens and why it happens (a stacktrace would help, probably)

Regarding the state internals: we would basically need to introduce one more layer, instead of keeping an AccumT we need to keep an Option<AccumT> or something of that sort. Nnot saying Java Optional here, on purpose. However, changing the state type would have the consequence that savepoints are no longer compatible, i.e. you cannot restore a job from before this change using a Beam version after this change. So I'm very reluctant.


On 15.10.20 11:51, Andrés Garagiola wrote:
Thanks Luke, Aljoscha

Let me know if I can help you to reproduce the problem.
In my case the state is never set to null but I think that it becomes null
while the job is stopping. Once I run the job again from the savepoint, the
state is recovered normally.

Let's show this with an example:

t0: Add input 1 => accu state [1] => output [1]
t1: Add input 2 => acu state [1,2] => output [1,2]
t2: stop job with savepoint => output [1,2,3] and *output [] *
t3: run job from savepoint => acu state [1,2] => no output
t4: Add input 3 => acu state [1,2,3] => [1,2,3]

Regards

On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]>
wrote:

I'll take a look.

On 14.10.20 18:49, Luke Cwik wrote:
Assuming that null means that the accumulator was never created is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but possible.
Filed
https://issues.apache.org/jira/browse/BEAM-11063.

+Aljoscha Krettek <[email protected]> Is this something you can take a
look at?

On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
[email protected]>
wrote:

Hi all,

I have this problem in a stream pipeline using the runner Apache Flink
1.19. I want to do an upgrade to my job. I first end the job by using
the
Flink API creating a savepoint, and then I start the new version by
using
the Flink API passing the savepoint path.

When the job ends two new records are created. The first one is OK but
the
second one is an empty record.


My pipeline uses this window strategy:


*Window<KV<String, TaggedEvent>> window =*

*    Window.<KV<String,
TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*

*        .triggering(AfterWatermark.pastEndOfWindow()*

*

  
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*

*
   .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*

*

.withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*

*        .accumulatingFiredPanes();*


I implemented a custom combiner, and I realized that the state of the
combiner is null in the second output. This line (

https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
)
is evaluated to false, and then it creates an empty accumulator.


Is this the expected behavior?


Thanks






Reply via email to