[ https://issues.apache.org/jira/browse/BEAM-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720186#comment-16720186 ]
Maximilian Michels commented on BEAM-6227: ------------------------------------------ It appears ValidatesRunner is correctly setup to run in batch and streaming and it passes. The tests don't seem to catch this. > FlinkRunner errors if GroupByKey contains null values (streaming mode only) > --------------------------------------------------------------------------- > > Key: BEAM-6227 > URL: https://issues.apache.org/jira/browse/BEAM-6227 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 2.9.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Major > Fix For: 2.10.0 > > > Apparently this passed ValidatesRunner because it is also present in > streaming mode: > {noformat} > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkRunner.class); > // force streaming mode > options.setStreaming(true); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(Window.into(FixedWindows.of(Duration.millis(10)))) > .apply(ParDo.of( > new DoFn<Long, KV<String, Void>>() { > @ProcessElement > public void processElement(ProcessContext pc) { > pc.output(KV.of("hello", null)); > } > } > )) > .apply(GroupByKey.create()); > pipeline.run(); > {noformat} > Throws: > {noformat} > Caused by: java.lang.RuntimeException: Error adding to bag state. > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:299) > at > org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) > at > org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > Caused by: java.lang.NullPointerException: You cannot add null to a ListState. > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:89) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:297) > at > org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) > at > org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Will do a follow-up for running ValidatesRunner in streaming mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)