Hi Luke,

Without more details, I don’t have a good explanation for why you’re seeing 
duplicate results.

— Ken

> On Apr 29, 2023, at 7:38 PM, Luke Xiong <leix...@gmail.com> wrote:
> 
> Hi Ken,
> 
> My workflow looks like this:
> 
> dataStream
>     .map(A -> B)
>     .flatMap(B -> some Tuple2.of(C, 1))
>     .keyBy(t, t.f0) // a.k.a. C
>     .sum(1)
>     .map(Tuple2.of(C, <count>) -> d)
>     ;
> 
> So just illustrative, and I am not writing a WordCount job either.
> 
> - Luke
> 
> 
> On Sat, Apr 29, 2023 at 10:31 PM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Luke,
> 
> What’s your workflow look like?
> 
> The one I included in my email generates what I’d expect.
> 
> — Ken
> 
>> On Apr 29, 2023, at 7:22 PM, Luke Xiong <leix...@gmail.com 
>> <mailto:leix...@gmail.com>> wrote:
>> 
>> Hi Marco and Ken,
>> 
>> Thanks for the tips. I tried setting runtime mode to BATCH and it seems to 
>> work. However, I notice there are duplicate keys in the resulting stream. 
>> For example, it has
>> (hello, 2)
>> (hello, 2)
>> instead of
>> (hello, 4)
>> 
>> I suspect there might be a bug in the equals method of the keyed object, but 
>> it doesn't seem likely, because I can get expected result with
>> .collect(Collectors.groupingByConcurrent(Function.identity(), 
>> Collectors.counting()))
>> with the same stream using the Java Stream API.
>> Is there any other reason that causes it, and what should I do to get a 
>> stream with only one element per key?
>> -lxiong
>> 
>> On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Luke,
>> 
>> If you’re reading from a bounded source (like a file, and you’re not doing 
>> any monitoring of the directory or the file to load more data) then yes, you 
>> can get a final count using a stream.
>> 
>> The key point is that the Flink file source will send out a 
>> Watermark.MAX_WATERMARK value when it is done, and this will trigger any 
>> pending aggregations.
>> 
>> Though you do need to make sure your workflow mode is either explicitly 
>> BOUNDED, or AUTOMATIC.
>> 
>> Something like…
>> 
>>         final StreamExecutionEnvironment env = ...;
>>         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
>>         
>>         DataStream<String> lines = ...;
>>         
>>         lines
>>             .flatMap(new SplitLineFunction())
>>             .keyBy(t -> t.f0)
>>             .sum(1)
>>             .print();
>>         
>>         env.execute();
>> 
>> …with a typical line splitter, cheesy version like...
>> 
>>     private static class SplitLineFunction implements 
>> FlatMapFunction<String, Tuple2<String, Integer>> {
>> 
>>         @Override
>>         public void flatMap(String in, Collector<Tuple2<String, Integer>> 
>> out) throws Exception {
>>             for (String word : in.split(" ")) {
>>                 out.collect(Tuple2.of(word, 1));
>>             }
>>         }
>>     }
>> 
>> — Ken
>> 
>>> On Apr 28, 2023, at 10:56 PM, Luke Xiong <leix...@gmail.com 
>>> <mailto:leix...@gmail.com>> wrote:
>>> 
>>> Dear experts,
>>> 
>>> Is it possible to write a WordCount job that uses the DataStream API, but 
>>> make it behave like the batch version WordCount example?
>>> 
>>> More specifically, I hope the job can get a DataStream of the final (word, 
>>> count) records when fed a text file.
>>> 
>>> For example, given a text file:
>>> ```input.txt
>>> hello hello world hello world
>>> hello world world world hello world
>>> ```
>>> 
>>> In the flink WordCount examples, the batch version outputs:
>>> ```batch.version.output
>>> hello 5
>>> world 6
>>> ```
>>> 
>>> while the stream version outputs:
>>> ```stream.version.output
>>> (hello,1)
>>> (hello,2)
>>> (world,1)
>>> (hello,3)
>>> (world,2)
>>> (hello,4)
>>> (world,3)
>>> (world,4)
>>> (world,5)
>>> (hello,5)
>>> (world,6)
>>> ```
>>> Is it possible to have a DataStream that only has two elements: (hello, 5) 
>>> and (world, 6)?
>>> 
>>> Regards,
>>> Luke
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to