Aljoscha, I think having this fixed is more important than maintaining the update support since it leads to incorrect results.
On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola <[email protected]> wrote: > Hi Aljoscha, > > I printed the stack trace in the createAccumulator() method of my combinar: > > taskmanager_1 | java.lang.Thread.getStackTrace(Thread.java:1559) > taskmanager_1 | > com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29) > taskmanager_1 | > com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510) > taskmanager_1 | > org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127) > taskmanager_1 | > org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059) > taskmanager_1 | > org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768) > taskmanager_1 | > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137) > taskmanager_1 | > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > taskmanager_1 | > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > taskmanager_1 | > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > taskmanager_1 | > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > taskmanager_1 | > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913) > taskmanager_1 | > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) > taskmanager_1 | > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702) > taskmanager_1 | > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681) > taskmanager_1 | > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) > taskmanager_1 | > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > taskmanager_1 | > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > taskmanager_1 | > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) > taskmanager_1 | > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) > taskmanager_1 | > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > taskmanager_1 | > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) > taskmanager_1 | > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) > taskmanager_1 | > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) > taskmanager_1 | > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > taskmanager_1 | > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > taskmanager_1 | java.lang.Thread.run(Thread.java:748) > > Let me know if there is something else in which I can help. > > Regards > > On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <[email protected]> > wrote: > >> There's multiple things that come together here, I'm afraid: >> >> 1. There is additional output when stopping with a savepoint. It would >> be good to know where that comes from. >> >> 2. The state internals implementation does in fact seem wrong. We don't >> differentiate the cases of "never created an accumulator" and "my >> accumulator is null". >> >> @Andrés, could you put breakpoints in your Combiner implementation and >> see when that second output happens and why it happens (a stacktrace >> would help, probably) >> >> Regarding the state internals: we would basically need to introduce one >> more layer, instead of keeping an AccumT we need to keep an >> Option<AccumT> or something of that sort. Nnot saying Java Optional >> here, on purpose. However, changing the state type would have the >> consequence that savepoints are no longer compatible, i.e. you cannot >> restore a job from before this change using a Beam version after this >> change. So I'm very reluctant. >> >> >> On 15.10.20 11:51, Andrés Garagiola wrote: >> > Thanks Luke, Aljoscha >> > >> > Let me know if I can help you to reproduce the problem. >> > In my case the state is never set to null but I think that it becomes >> null >> > while the job is stopping. Once I run the job again from the savepoint, >> the >> > state is recovered normally. >> > >> > Let's show this with an example: >> > >> > t0: Add input 1 => accu state [1] => output [1] >> > t1: Add input 2 => acu state [1,2] => output [1,2] >> > t2: stop job with savepoint => output [1,2,3] and *output [] * >> > t3: run job from savepoint => acu state [1,2] => no output >> > t4: Add input 3 => acu state [1,2,3] => [1,2,3] >> > >> > Regards >> > >> > On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]> >> > wrote: >> > >> >> I'll take a look. >> >> >> >> On 14.10.20 18:49, Luke Cwik wrote: >> >>> Assuming that null means that the accumulator was never created is not >> >>> right especially if null is a valid terminal state while the >> >>> initial accumulator value is non-null. This is uncommon but possible. >> >> Filed >> >>> https://issues.apache.org/jira/browse/BEAM-11063. >> >>> >> >>> +Aljoscha Krettek <[email protected]> Is this something you can >> take a >> >>> look at? >> >>> >> >>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola < >> >> [email protected]> >> >>> wrote: >> >>> >> >>>> Hi all, >> >>>> >> >>>> I have this problem in a stream pipeline using the runner Apache >> Flink >> >>>> 1.19. I want to do an upgrade to my job. I first end the job by using >> >> the >> >>>> Flink API creating a savepoint, and then I start the new version by >> >> using >> >>>> the Flink API passing the savepoint path. >> >>>> >> >>>> When the job ends two new records are created. The first one is OK >> but >> >> the >> >>>> second one is an empty record. >> >>>> >> >>>> >> >>>> My pipeline uses this window strategy: >> >>>> >> >>>> >> >>>> *Window<KV<String, TaggedEvent>> window =* >> >>>> >> >>>> * Window.<KV<String, >> >>>> >> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))* >> >>>> >> >>>> * .triggering(AfterWatermark.pastEndOfWindow()* >> >>>> >> >>>> * >> >>>> >> >> >> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))* >> >>>> >> >>>> * >> >>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))* >> >>>> >> >>>> * >> >>>> >> >> >> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))* >> >>>> >> >>>> * .accumulatingFiredPanes();* >> >>>> >> >>>> >> >>>> I implemented a custom combiner, and I realized that the state of the >> >>>> combiner is null in the second output. This line ( >> >>>> >> >> >> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507 >> >> ) >> >>>> is evaluated to false, and then it creates an empty accumulator. >> >>>> >> >>>> >> >>>> Is this the expected behavior? >> >>>> >> >>>> >> >>>> Thanks >> >>>> >> >>> >> >> >> >> >> > >> >>
