Hi Luke, Without more details, I don’t have a good explanation for why you’re seeing duplicate results.
— Ken > On Apr 29, 2023, at 7:38 PM, Luke Xiong <leix...@gmail.com> wrote: > > Hi Ken, > > My workflow looks like this: > > dataStream > .map(A -> B) > .flatMap(B -> some Tuple2.of(C, 1)) > .keyBy(t, t.f0) // a.k.a. C > .sum(1) > .map(Tuple2.of(C, <count>) -> d) > ; > > So just illustrative, and I am not writing a WordCount job either. > > - Luke > > > On Sat, Apr 29, 2023 at 10:31 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi Luke, > > What’s your workflow look like? > > The one I included in my email generates what I’d expect. > > — Ken > >> On Apr 29, 2023, at 7:22 PM, Luke Xiong <leix...@gmail.com >> <mailto:leix...@gmail.com>> wrote: >> >> Hi Marco and Ken, >> >> Thanks for the tips. I tried setting runtime mode to BATCH and it seems to >> work. However, I notice there are duplicate keys in the resulting stream. >> For example, it has >> (hello, 2) >> (hello, 2) >> instead of >> (hello, 4) >> >> I suspect there might be a bug in the equals method of the keyed object, but >> it doesn't seem likely, because I can get expected result with >> .collect(Collectors.groupingByConcurrent(Function.identity(), >> Collectors.counting())) >> with the same stream using the Java Stream API. >> Is there any other reason that causes it, and what should I do to get a >> stream with only one element per key? >> -lxiong >> >> On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> Hi Luke, >> >> If you’re reading from a bounded source (like a file, and you’re not doing >> any monitoring of the directory or the file to load more data) then yes, you >> can get a final count using a stream. >> >> The key point is that the Flink file source will send out a >> Watermark.MAX_WATERMARK value when it is done, and this will trigger any >> pending aggregations. >> >> Though you do need to make sure your workflow mode is either explicitly >> BOUNDED, or AUTOMATIC. >> >> Something like… >> >> final StreamExecutionEnvironment env = ...; >> env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); >> >> DataStream<String> lines = ...; >> >> lines >> .flatMap(new SplitLineFunction()) >> .keyBy(t -> t.f0) >> .sum(1) >> .print(); >> >> env.execute(); >> >> …with a typical line splitter, cheesy version like... >> >> private static class SplitLineFunction implements >> FlatMapFunction<String, Tuple2<String, Integer>> { >> >> @Override >> public void flatMap(String in, Collector<Tuple2<String, Integer>> >> out) throws Exception { >> for (String word : in.split(" ")) { >> out.collect(Tuple2.of(word, 1)); >> } >> } >> } >> >> — Ken >> >>> On Apr 28, 2023, at 10:56 PM, Luke Xiong <leix...@gmail.com >>> <mailto:leix...@gmail.com>> wrote: >>> >>> Dear experts, >>> >>> Is it possible to write a WordCount job that uses the DataStream API, but >>> make it behave like the batch version WordCount example? >>> >>> More specifically, I hope the job can get a DataStream of the final (word, >>> count) records when fed a text file. >>> >>> For example, given a text file: >>> ```input.txt >>> hello hello world hello world >>> hello world world world hello world >>> ``` >>> >>> In the flink WordCount examples, the batch version outputs: >>> ```batch.version.output >>> hello 5 >>> world 6 >>> ``` >>> >>> while the stream version outputs: >>> ```stream.version.output >>> (hello,1) >>> (hello,2) >>> (world,1) >>> (hello,3) >>> (world,2) >>> (hello,4) >>> (world,3) >>> (world,4) >>> (world,5) >>> (hello,5) >>> (world,6) >>> ``` >>> Is it possible to have a DataStream that only has two elements: (hello, 5) >>> and (world, 6)? >>> >>> Regards, >>> Luke >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch