Aljoscha, I think having this fixed is more important than maintaining the
update support since it leads to incorrect results.

On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola <[email protected]>
wrote:

> Hi Aljoscha,
>
> I printed the stack trace in the createAccumulator() method of my combinar:
>
> taskmanager_1  | java.lang.Thread.getStackTrace(Thread.java:1559)
> taskmanager_1  |
> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29)
> taskmanager_1  |
> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510)
> taskmanager_1  |
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127)
> taskmanager_1  |
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059)
> taskmanager_1  |
> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
> taskmanager_1  |
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
> taskmanager_1  |
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> taskmanager_1  |
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> taskmanager_1  |
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> taskmanager_1  |
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> taskmanager_1  |
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913)
> taskmanager_1  |
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
> taskmanager_1  |
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702)
> taskmanager_1  |
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
> taskmanager_1  |
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
> taskmanager_1  |
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> taskmanager_1  |
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> taskmanager_1  | java.lang.Thread.run(Thread.java:748)
>
> Let me know if there is something else in which I can help.
>
> Regards
>
> On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <[email protected]>
> wrote:
>
>> There's multiple things that come together here, I'm afraid:
>>
>> 1. There is additional output when stopping with a savepoint. It would
>> be good to know where that comes from.
>>
>> 2. The state internals implementation does in fact seem wrong. We don't
>> differentiate the cases of "never created an accumulator" and "my
>> accumulator is null".
>>
>> @Andrés, could you put breakpoints in your Combiner implementation and
>> see when that second output happens and why it happens (a stacktrace
>> would help, probably)
>>
>> Regarding the state internals: we would basically need to introduce one
>> more layer, instead of keeping an AccumT we need to keep an
>> Option<AccumT> or something of that sort. Nnot saying Java Optional
>> here, on purpose. However, changing the state type would have the
>> consequence that savepoints are no longer compatible, i.e. you cannot
>> restore a job from before this change using a Beam version after this
>> change. So I'm very reluctant.
>>
>>
>> On 15.10.20 11:51, Andrés Garagiola wrote:
>> > Thanks Luke, Aljoscha
>> >
>> > Let me know if I can help you to reproduce the problem.
>> > In my case the state is never set to null but I think that it becomes
>> null
>> > while the job is stopping. Once I run the job again from the savepoint,
>> the
>> > state is recovered normally.
>> >
>> > Let's show this with an example:
>> >
>> > t0: Add input 1 => accu state [1] => output [1]
>> > t1: Add input 2 => acu state [1,2] => output [1,2]
>> > t2: stop job with savepoint => output [1,2,3] and *output [] *
>> > t3: run job from savepoint => acu state [1,2] => no output
>> > t4: Add input 3 => acu state [1,2,3] => [1,2,3]
>> >
>> > Regards
>> >
>> > On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <[email protected]>
>> > wrote:
>> >
>> >> I'll take a look.
>> >>
>> >> On 14.10.20 18:49, Luke Cwik wrote:
>> >>> Assuming that null means that the accumulator was never created is not
>> >>> right especially if null is a valid terminal state while the
>> >>> initial accumulator value is non-null. This is uncommon but possible.
>> >> Filed
>> >>> https://issues.apache.org/jira/browse/BEAM-11063.
>> >>>
>> >>> +Aljoscha Krettek <[email protected]> Is this something you can
>> take a
>> >>> look at?
>> >>>
>> >>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
>> >> [email protected]>
>> >>> wrote:
>> >>>
>> >>>> Hi all,
>> >>>>
>> >>>> I have this problem in a stream pipeline using the runner Apache
>> Flink
>> >>>> 1.19. I want to do an upgrade to my job. I first end the job by using
>> >> the
>> >>>> Flink API creating a savepoint, and then I start the new version by
>> >> using
>> >>>> the Flink API passing the savepoint path.
>> >>>>
>> >>>> When the job ends two new records are created. The first one is OK
>> but
>> >> the
>> >>>> second one is an empty record.
>> >>>>
>> >>>>
>> >>>> My pipeline uses this window strategy:
>> >>>>
>> >>>>
>> >>>> *Window<KV<String, TaggedEvent>> window =*
>> >>>>
>> >>>> *    Window.<KV<String,
>> >>>>
>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>> >>>>
>> >>>> *        .triggering(AfterWatermark.pastEndOfWindow()*
>> >>>>
>> >>>> *
>> >>>>
>> >>
>>  
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
>> >>>>
>> >>>> *
>> >>>>    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>> >>>>
>> >>>> *
>> >>>>
>> >>
>> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
>> >>>>
>> >>>> *        .accumulatingFiredPanes();*
>> >>>>
>> >>>>
>> >>>> I implemented a custom combiner, and I realized that the state of the
>> >>>> combiner is null in the second output. This line (
>> >>>>
>> >>
>> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
>> >> )
>> >>>> is evaluated to false, and then it creates an empty accumulator.
>> >>>>
>> >>>>
>> >>>> Is this the expected behavior?
>> >>>>
>> >>>>
>> >>>> Thanks
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to