I did a little experiment, and I was able to reproduce this if I use the
sum aggregator on KeyedStream to do the counting.

However, if I implement my own counting in a KeyedProcessFunction, or if I
use the Table API, I get correct results with RuntimeExecutionMode.BATCH --
though the results are produced incrementally, as they would be in
streaming mode.

In FLIP-134: Batch execution for the DataStream API [1] it was decided to
deprecate these relational methods -- such as sum -- on KeyedStream. But I
don't know if this means this behavior is to be expected, or not.

I've cc'ed @Aljoscha Krettek <[email protected]>, who should be able to
shed some light on this.

Best,
David

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API

On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng <[email protected]>
wrote:

> Hi team,
>
> Recently I am trying to explore the new features of Flink 1.12 with Batch
> Execution.
>
> I locally wrote a classic WordCount program to read from text file and
> count the words (almost same as the one in Flink Github repo
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala),
> and after reading
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
> I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the
> "env" to make it execute under BATCH mode. After running the code, the
> printed results showed only final count results instead of incremental
> results, which is expected. *But I also notice, all the words that only
> appear once have NOT been printed out*. I have tried different things
> like wrap the word in a case class etc, and read more details and see if I
> have missed anything but still not able to figure out (And I have tried the
> default examples come with the Flink package and got same results, and with
> using DataSet API I do not see this issue).
>
> Is there anything extra user need to specify or notice when using BATCH
> execution mode in datastream API with Flink 1.12 or this is kind of a bug
> please? The flink version I used is 1.12 with scala 2.11 (also tried java
> 1.8 and observed same issue)
>
> Please let me know if you need other info to help diagnose. Thank you very
> much!
>
> Bests,
>
> Derek Sheng
>

Reply via email to