Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
If the data does not have a key (or you do not care about it) you can also use a FlatMapFunction (or ProcessFunction) with Operator State. Operator State is not bound to a key but to a parallel operator instance. Have a look at the ListCheckpointed interface and its JavaDocs. 2017-06-23 18:27

Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
So there is no way to do a countWindow(100) and preserve data locality? My use case is this: augment a data stream with new fields from DynamoDB lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to collect 100 records before making that call. I have no other reason to do a

Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
No, you will lose data locality if you use keyBy(), which is the only way to obtain a KeyedStream. 2017-06-23 17:52 GMT+02:00 Edward : > Thanks, Fabian. > In this case, I could just extend your idea by creating some deterministic > multiplier of the subtask index: > >

Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Thanks, Fabian. In this case, I could just extend your idea by creating some deterministic multiplier of the subtask index: RichMapFunction> keyByMap = new RichMapFunction>() { public Tuple2 map(String

Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
Flink hashes the keys and computes the target partition using modulo. This works well, if you have many keys but leads to skew if the number of keys is close to the number of partitions. If you use parittionCustom, you can explicitly define the target partition, however, partitionCustom does not

Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Hi Fabian - I've tried this idea of creating a KeyedStream based on getRuntimeContext().getIndexOfThisSubtask(). However, not all target subtasks are receiving records. All subtasks have a parallelism of 12, so I have 12 source subtasks and 12 target subtasks. I've confirmed that the call to

Re: Strange behavior of DataStream.countWindow

2016-06-13 Thread Fabian Hueske
If I understood you correctly, you want to compute windows in parallel without using a key. Are you aware that the results of such a computation is not deterministic and kind of arbitrary? If that is still OK for you, you can use a mapper to assign the current parallel index as a key field, i.e.,

Re: Strange behavior of DataStream.countWindow

2016-06-11 Thread Yukun Guo
Thx, now I use element.hashCode() % nPartitions and it works as expected. But I'm afraid it's not a best practice for just turning a plain (already paralellized) DataStream into a KeyedStream? Because it introduces some overhead due to physical repartitioning by key, which is unnecessary since I

Re: Strange behavior of DataStream.countWindow

2016-06-09 Thread Fabian Hueske
Hi Yukun, the problem is that the KeySelector is internally invoked multiple times. Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times. The documentation is not discussing this aspect and should be extended. Thanks for pointing out

Strange behavior of DataStream.countWindow

2016-06-09 Thread Yukun Guo
I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements: import org.apache.flink.api.common.functions.FlatMapFunction;import