Is this the behaviour we should go for? What's interesting is that all state backends create an empty accumulator when you try and read state for a key that didn't exist yet. Is this even the expected behaviour? The first uncommented line would test for that.
As is, all StateInternals implementations that use this test base fail that test.
On 19.10.20 18:17, Luke Cwik wrote:
Aljoscha, I think having this fixed is more important than maintaining the update support since it leads to incorrect results. On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola <[email protected]> wrote:Hi Aljoscha, I printed the stack trace in the createAccumulator() method of my combinar: taskmanager_1 | java.lang.Thread.getStackTrace(Thread.java:1559) taskmanager_1 | com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29) taskmanager_1 | com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510) taskmanager_1 | org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127) taskmanager_1 | org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059) taskmanager_1 | org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768) taskmanager_1 | org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137) taskmanager_1 | org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) taskmanager_1 | org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) taskmanager_1 | org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) taskmanager_1 | org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) taskmanager_1 | org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913) taskmanager_1 | org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) taskmanager_1 | org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) taskmanager_1 | org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) taskmanager_1 | org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) taskmanager_1 | org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) taskmanager_1 | org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) taskmanager_1 | java.lang.Thread.run(Thread.java:748) Let me know if there is something else in which I can help. Regards On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <[email protected]> wrote:There's multiple things that come together here, I'm afraid: 1. There is additional output when stopping with a savepoint. It would be good to know where that comes from. 2. The state internals implementation does in fact seem wrong. We don't differentiate the cases of "never created an accumulator" and "my accumulator is null". @Andrés, could you put breakpoints in your Combiner implementation and see when that second output happens and why it happens (a stacktrace would help, probably) Regarding the state internals: we would basically need to introduce one more layer, instead of keeping an AccumT we need to keep an Option<AccumT> or something of that sort. Nnot saying Java Optional here, on purpose. However, changing the state type would have the consequence that savepoints are no longer compatible, i.e. you cannot restore a job from before this change using a Beam version after this change. So I'm very reluctant. On 15.10.20 11:51, Andrés Garagiola wrote:Thanks Luke, Aljoscha Let me know if I can help you to reproduce the problem. In my case the state is never set to null but I think that it becomesnullwhile the job is stopping. Once I run the job again from the savepoint,thestate is recovered normally. Let's show this with an example: t0: Add input 1 => accu state [1] => output [1] t1: Add input 2 => acu state [1,2] => output [1,2] t2: stop job with savepoint => output [1,2,3] and *output [] * t3: run job from savepoint => acu state [1,2] => no output t4: Add input 3 => acu state [1,2,3] => [1,2,3] Regards On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]> wrote:I'll take a look. On 14.10.20 18:49, Luke Cwik wrote:Assuming that null means that the accumulator was never created is not right especially if null is a valid terminal state while the initial accumulator value is non-null. This is uncommon but possible.Filedhttps://issues.apache.org/jira/browse/BEAM-11063. +Aljoscha Krettek <[email protected]> Is this something you cantake alook at? On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <[email protected]>wrote:Hi all, I have this problem in a stream pipeline using the runner ApacheFlink1.19. I want to do an upgrade to my job. I first end the job by usingtheFlink API creating a savepoint, and then I start the new version byusingthe Flink API passing the savepoint path. When the job ends two new records are created. The first one is OKbutthesecond one is an empty record. My pipeline uses this window strategy: *Window<KV<String, TaggedEvent>> window =* * Window.<KV<String,TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))** .triggering(AfterWatermark.pastEndOfWindow()* *.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))** .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))* *.withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))** .accumulatingFiredPanes();* I implemented a custom combiner, and I realized that the state of the combiner is null in the second output. This line (https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507)is evaluated to false, and then it creates an empty accumulator. Is this the expected behavior? Thanks
