Btw, I think we won't be done fixing the original problem with just
this. I have a suspicion that we also would need to touch the code that
checks whether to emit a result for a pane or not. Or maybe fixing the
code that checks for empty panes will already be sufficient.
Also, please check out my branch instead of just the commit I posted
earlier because I pushed another commit:
https://github.com/aljoscha/beam/tree/beam-11063-null-value-combinefn
On 11.11.20 12:21, Aljoscha Krettek wrote:
I assigned the issue to myself and whipped up a quick test case to check
what our expectations for this are:
https://github.com/aljoscha/beam/commit/cd0cea6c740846c0eb79091b0e7862487facf07b
Is this the behaviour we should go for? What's interesting is that all
state backends create an empty accumulator when you try and read state
for a key that didn't exist yet. Is this even the expected behaviour?
The first uncommented line would test for that.
As is, all StateInternals implementations that use this test base fail
that test.
On 19.10.20 18:17, Luke Cwik wrote:
Aljoscha, I think having this fixed is more important than maintaining
the
update support since it leads to incorrect results.
On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola
<[email protected]>
wrote:
Hi Aljoscha,
I printed the stack trace in the createAccumulator() method of my
combinar:
taskmanager_1 | java.lang.Thread.getStackTrace(Thread.java:1559)
taskmanager_1 |
com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29)
taskmanager_1 |
com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510)
taskmanager_1 |
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127)
taskmanager_1 |
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059)
taskmanager_1 |
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
taskmanager_1 |
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
taskmanager_1 |
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
taskmanager_1 |
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
taskmanager_1 |
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
taskmanager_1 |
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
taskmanager_1 |
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913)
taskmanager_1 |
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
taskmanager_1 |
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702)
taskmanager_1 |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681)
taskmanager_1 |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
taskmanager_1 |
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
taskmanager_1 |
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
taskmanager_1 |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
taskmanager_1 |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
taskmanager_1 |
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
taskmanager_1 |
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
taskmanager_1 |
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
taskmanager_1 |
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
taskmanager_1 |
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
taskmanager_1 |
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
taskmanager_1 | java.lang.Thread.run(Thread.java:748)
Let me know if there is something else in which I can help.
Regards
On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <[email protected]>
wrote:
There's multiple things that come together here, I'm afraid:
1. There is additional output when stopping with a savepoint. It would
be good to know where that comes from.
2. The state internals implementation does in fact seem wrong. We don't
differentiate the cases of "never created an accumulator" and "my
accumulator is null".
@Andrés, could you put breakpoints in your Combiner implementation and
see when that second output happens and why it happens (a stacktrace
would help, probably)
Regarding the state internals: we would basically need to introduce one
more layer, instead of keeping an AccumT we need to keep an
Option<AccumT> or something of that sort. Nnot saying Java Optional
here, on purpose. However, changing the state type would have the
consequence that savepoints are no longer compatible, i.e. you cannot
restore a job from before this change using a Beam version after this
change. So I'm very reluctant.
On 15.10.20 11:51, Andrés Garagiola wrote:
Thanks Luke, Aljoscha
Let me know if I can help you to reproduce the problem.
In my case the state is never set to null but I think that it becomes
null
while the job is stopping. Once I run the job again from the
savepoint,
the
state is recovered normally.
Let's show this with an example:
t0: Add input 1 => accu state [1] => output [1]
t1: Add input 2 => acu state [1,2] => output [1,2]
t2: stop job with savepoint => output [1,2,3] and *output [] *
t3: run job from savepoint => acu state [1,2] => no output
t4: Add input 3 => acu state [1,2,3] => [1,2,3]
Regards
On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek
<[email protected]>
wrote:
I'll take a look.
On 14.10.20 18:49, Luke Cwik wrote:
Assuming that null means that the accumulator was never created
is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but
possible.
Filed
https://issues.apache.org/jira/browse/BEAM-11063.
+Aljoscha Krettek <[email protected]> Is this something you can
take a
look at?
On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
[email protected]>
wrote:
Hi all,
I have this problem in a stream pipeline using the runner Apache
Flink
1.19. I want to do an upgrade to my job. I first end the job by
using
the
Flink API creating a savepoint, and then I start the new version by
using
the Flink API passing the savepoint path.
When the job ends two new records are created. The first one is OK
but
the
second one is an empty record.
My pipeline uses this window strategy:
*Window<KV<String, TaggedEvent>> window =*
* Window.<KV<String,
TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
* .triggering(AfterWatermark.pastEndOfWindow()*
*
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
*
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
*
.withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
* .accumulatingFiredPanes();*
I implemented a custom combiner, and I realized that the state
of the
combiner is null in the second output. This line (
https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
)
is evaluated to false, and then it creates an empty accumulator.
Is this the expected behavior?
Thanks