Hi Luke,

What’s your workflow look like?

The one I included in my email generates what I’d expect.

— Ken

> On Apr 29, 2023, at 7:22 PM, Luke Xiong <leix...@gmail.com> wrote:
> 
> Hi Marco and Ken,
> 
> Thanks for the tips. I tried setting runtime mode to BATCH and it seems to 
> work. However, I notice there are duplicate keys in the resulting stream. For 
> example, it has
> (hello, 2)
> (hello, 2)
> instead of
> (hello, 4)
> 
> I suspect there might be a bug in the equals method of the keyed object, but 
> it doesn't seem likely, because I can get expected result with
> .collect(Collectors.groupingByConcurrent(Function.identity(), 
> Collectors.counting()))
> with the same stream using the Java Stream API.
> Is there any other reason that causes it, and what should I do to get a 
> stream with only one element per key?
> -lxiong
> 
> On Sat, Apr 29, 2023 at 5:24 PM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Luke,
> 
> If you’re reading from a bounded source (like a file, and you’re not doing 
> any monitoring of the directory or the file to load more data) then yes, you 
> can get a final count using a stream.
> 
> The key point is that the Flink file source will send out a 
> Watermark.MAX_WATERMARK value when it is done, and this will trigger any 
> pending aggregations.
> 
> Though you do need to make sure your workflow mode is either explicitly 
> BOUNDED, or AUTOMATIC.
> 
> Something like…
> 
>         final StreamExecutionEnvironment env = ...;
>         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
>         
>         DataStream<String> lines = ...;
>         
>         lines
>             .flatMap(new SplitLineFunction())
>             .keyBy(t -> t.f0)
>             .sum(1)
>             .print();
>         
>         env.execute();
> 
> …with a typical line splitter, cheesy version like...
> 
>     private static class SplitLineFunction implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
> 
>         @Override
>         public void flatMap(String in, Collector<Tuple2<String, Integer>> 
> out) throws Exception {
>             for (String word : in.split(" ")) {
>                 out.collect(Tuple2.of(word, 1));
>             }
>         }
>     }
> 
> — Ken
> 
>> On Apr 28, 2023, at 10:56 PM, Luke Xiong <leix...@gmail.com 
>> <mailto:leix...@gmail.com>> wrote:
>> 
>> Dear experts,
>> 
>> Is it possible to write a WordCount job that uses the DataStream API, but 
>> make it behave like the batch version WordCount example?
>> 
>> More specifically, I hope the job can get a DataStream of the final (word, 
>> count) records when fed a text file.
>> 
>> For example, given a text file:
>> ```input.txt
>> hello hello world hello world
>> hello world world world hello world
>> ```
>> 
>> In the flink WordCount examples, the batch version outputs:
>> ```batch.version.output
>> hello 5
>> world 6
>> ```
>> 
>> while the stream version outputs:
>> ```stream.version.output
>> (hello,1)
>> (hello,2)
>> (world,1)
>> (hello,3)
>> (world,2)
>> (hello,4)
>> (world,3)
>> (world,4)
>> (world,5)
>> (hello,5)
>> (world,6)
>> ```
>> Is it possible to have a DataStream that only has two elements: (hello, 5) 
>> and (world, 6)?
>> 
>> Regards,
>> Luke
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to