Hi Luke, What’s your workflow look like?
The one I included in my email generates what I’d expect. — Ken > On Apr 29, 2023, at 7:22 PM, Luke Xiong <leix...@gmail.com> wrote: > > Hi Marco and Ken, > > Thanks for the tips. I tried setting runtime mode to BATCH and it seems to > work. However, I notice there are duplicate keys in the resulting stream. For > example, it has > (hello, 2) > (hello, 2) > instead of > (hello, 4) > > I suspect there might be a bug in the equals method of the keyed object, but > it doesn't seem likely, because I can get expected result with > .collect(Collectors.groupingByConcurrent(Function.identity(), > Collectors.counting())) > with the same stream using the Java Stream API. > Is there any other reason that causes it, and what should I do to get a > stream with only one element per key? > -lxiong > > On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi Luke, > > If you’re reading from a bounded source (like a file, and you’re not doing > any monitoring of the directory or the file to load more data) then yes, you > can get a final count using a stream. > > The key point is that the Flink file source will send out a > Watermark.MAX_WATERMARK value when it is done, and this will trigger any > pending aggregations. > > Though you do need to make sure your workflow mode is either explicitly > BOUNDED, or AUTOMATIC. > > Something like… > > final StreamExecutionEnvironment env = ...; > env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > > DataStream<String> lines = ...; > > lines > .flatMap(new SplitLineFunction()) > .keyBy(t -> t.f0) > .sum(1) > .print(); > > env.execute(); > > …with a typical line splitter, cheesy version like... > > private static class SplitLineFunction implements FlatMapFunction<String, > Tuple2<String, Integer>> { > > @Override > public void flatMap(String in, Collector<Tuple2<String, Integer>> > out) throws Exception { > for (String word : in.split(" ")) { > out.collect(Tuple2.of(word, 1)); > } > } > } > > — Ken > >> On Apr 28, 2023, at 10:56 PM, Luke Xiong <leix...@gmail.com >> <mailto:leix...@gmail.com>> wrote: >> >> Dear experts, >> >> Is it possible to write a WordCount job that uses the DataStream API, but >> make it behave like the batch version WordCount example? >> >> More specifically, I hope the job can get a DataStream of the final (word, >> count) records when fed a text file. >> >> For example, given a text file: >> ```input.txt >> hello hello world hello world >> hello world world world hello world >> ``` >> >> In the flink WordCount examples, the batch version outputs: >> ```batch.version.output >> hello 5 >> world 6 >> ``` >> >> while the stream version outputs: >> ```stream.version.output >> (hello,1) >> (hello,2) >> (world,1) >> (hello,3) >> (world,2) >> (hello,4) >> (world,3) >> (world,4) >> (world,5) >> (hello,5) >> (world,6) >> ``` >> Is it possible to have a DataStream that only has two elements: (hello, 5) >> and (world, 6)? >> >> Regards, >> Luke > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch