I did a little experiment, and I was able to reproduce this if I use the sum aggregator on KeyedStream to do the counting.
However, if I implement my own counting in a KeyedProcessFunction, or if I use the Table API, I get correct results with RuntimeExecutionMode.BATCH -- though the results are produced incrementally, as they would be in streaming mode. In FLIP-134: Batch execution for the DataStream API [1] it was decided to deprecate these relational methods -- such as sum -- on KeyedStream. But I don't know if this means this behavior is to be expected, or not. I've cc'ed @Aljoscha Krettek <[email protected]>, who should be able to shed some light on this. Best, David [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng <[email protected]> wrote: > Hi team, > > Recently I am trying to explore the new features of Flink 1.12 with Batch > Execution. > > I locally wrote a classic WordCount program to read from text file and > count the words (almost same as the one in Flink Github repo > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala), > and after reading > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html > I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the > "env" to make it execute under BATCH mode. After running the code, the > printed results showed only final count results instead of incremental > results, which is expected. *But I also notice, all the words that only > appear once have NOT been printed out*. I have tried different things > like wrap the word in a case class etc, and read more details and see if I > have missed anything but still not able to figure out (And I have tried the > default examples come with the Flink package and got same results, and with > using DataSet API I do not see this issue). > > Is there anything extra user need to specify or notice when using BATCH > execution mode in datastream API with Flink 1.12 or this is kind of a bug > please? The flink version I used is 1.12 with scala 2.11 (also tried java > 1.8 and observed same issue) > > Please let me know if you need other info to help diagnose. Thank you very > much! > > Bests, > > Derek Sheng >
