Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the
same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian


2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk....@gmail.com>:

> I’m playing with the (Window)WordCount example from Flink QuickStart. I
> generate a DataStream consisting of 1000 Strings of random digits, which
> is windowed with a tumbling count window of 50 elements:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;import 
> org.apache.flink.api.java.functions.KeySelector;import 
> org.apache.flink.api.java.tuple.Tuple2;import 
> org.apache.flink.streaming.api.datastream.DataStream;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.util.Collector;
> import java.util.Random;
> public class DigitCount {
>
>
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         DataStream<String> text = env.fromElements(
>                 "14159265358979323846264338327950288419716939937510",
>                 "58209749445923078164062862089986280348253421170679",
>                 "82148086513282306647093844609550582231725359408128",
>                 "48111745028410270193852110555964462294895493038196",
>                 "44288109756659334461284756482337867831652712019091",
>                 "45648566923460348610454326648213393607260249141273",
>                 "72458700660631558817488152092096282925409171536436",
>                 "78925903600113305305488204665213841469519415116094",
>                 "33057270365759591953092186117381932611793105118548",
>                 "07446237996274956735188575272489122793818301194912",
>                 "98336733624406566430860213949463952247371907021798",
>                 "60943702770539217176293176752384674818467669405132",
>                 "00056812714526356082778577134275778960917363717872",
>                 "14684409012249534301465495853710507922796892589235",
>                 "42019956112129021960864034418159813629774771309960",
>                 "51870721134999999837297804995105973173281609631859",
>                 "50244594553469083026425223082533446850352619311881",
>                 "71010003137838752886587533208381420617177669147303",
>                 "59825349042875546873115956286388235378759375195778",
>                 "18577805321712268066130019278766111959092164201989"
>         );
>
>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>                 .flatMap(new Splitter())
>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>                     @Override
>                     public Integer getKey(Tuple2<Integer, Integer> x) throws 
> Exception {
>                         return x.f0 % 2;
>                     }
>                 })
>                 .countWindow(50)
>                 .sum(1);
>
>         digitCount.print();
>         env.execute();
>
>     }
>
>     public static final class Splitter implements FlatMapFunction<String, 
> Tuple2<Integer, Integer>> {
>         @Override
>         public void flatMap(String value, Collector<Tuple2<Integer, Integer>> 
> out) {
>             for (String token : value.split("")) {
>                 if (token.length() == 0) {
>                     continue;
>                 }
>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>             }
>         }
>     }
> }
>
> The code above will produce 19 lines of output which is reasonable as the
> 1000 digits will be keyed into 2 partitions where one partition contains
> 500+ elements and the other contains slightly fewer than 500 elements,
> therefore as a result one 50-digit window is ignored.
>
> So far so good, but if I replace the mod KeySelector with a random one:
>
> private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
>     private int nPartitions;
>     private Random random;
>
>     RandomKeySelector(int nPartitions) {
>         this.nPartitions = nPartitions;
>         random = new Random();
>     }
>
>     @Override
>     public Integer getKey(T dummy) throws Exception {
>         return random.nextInt(this.nPartitions);
>     }
> }
>
> and then
>
> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>
> it may generate 17 or 18 lines of output. How could that happen? Moreover,
> if I set the number of partitions to 10, in theory the lines of output
> should be no fewer than 11, but actually it can be only 9.
>
> Please help me understand why countWindow behaves like this.
>

Reply via email to