There's multiple things that come together here, I'm afraid:
1. There is additional output when stopping with a savepoint. It would
be good to know where that comes from.
2. The state internals implementation does in fact seem wrong. We don't
differentiate the cases of "never created an accumulator" and "my
accumulator is null".
@Andrés, could you put breakpoints in your Combiner implementation and
see when that second output happens and why it happens (a stacktrace
would help, probably)
Regarding the state internals: we would basically need to introduce one
more layer, instead of keeping an AccumT we need to keep an
Option<AccumT> or something of that sort. Nnot saying Java Optional
here, on purpose. However, changing the state type would have the
consequence that savepoints are no longer compatible, i.e. you cannot
restore a job from before this change using a Beam version after this
change. So I'm very reluctant.
On 15.10.20 11:51, Andrés Garagiola wrote:
Thanks Luke, Aljoscha
Let me know if I can help you to reproduce the problem.
In my case the state is never set to null but I think that it becomes null
while the job is stopping. Once I run the job again from the savepoint, the
state is recovered normally.
Let's show this with an example:
t0: Add input 1 => accu state [1] => output [1]
t1: Add input 2 => acu state [1,2] => output [1,2]
t2: stop job with savepoint => output [1,2,3] and *output [] *
t3: run job from savepoint => acu state [1,2] => no output
t4: Add input 3 => acu state [1,2,3] => [1,2,3]
Regards
On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]>
wrote:
I'll take a look.
On 14.10.20 18:49, Luke Cwik wrote:
Assuming that null means that the accumulator was never created is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but possible.
Filed
https://issues.apache.org/jira/browse/BEAM-11063.
+Aljoscha Krettek <[email protected]> Is this something you can take a
look at?
On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
[email protected]>
wrote:
Hi all,
I have this problem in a stream pipeline using the runner Apache Flink
1.19. I want to do an upgrade to my job. I first end the job by using
the
Flink API creating a savepoint, and then I start the new version by
using
the Flink API passing the savepoint path.
When the job ends two new records are created. The first one is OK but
the
second one is an empty record.
My pipeline uses this window strategy:
*Window<KV<String, TaggedEvent>> window =*
* Window.<KV<String,
TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
* .triggering(AfterWatermark.pastEndOfWindow()*
*
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
*
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
*
.withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
* .accumulatingFiredPanes();*
I implemented a custom combiner, and I realized that the state of the
combiner is null in the second output. This line (
https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
)
is evaluated to false, and then it creates an empty accumulator.
Is this the expected behavior?
Thanks