Re: Parallel computation of windows in Flink

2019-06-12 Thread Maximilian Michels
Hi Mike, It doesn't matter if your pipeline has a GroupByKey, as the file IO will add GroupByKey for the sharding. So the initial distribution does not matter. > Here's my theory of what's happening right now: Beam translates GBK 1:1 to > Flink's `.keyBy` & Flink's `.keyBy` serializes by all

Re: Parallel computation of windows in Flink

2019-06-11 Thread Maximilian Michels
Thanks for clarifying. The output will be grouped by the shard key before producing the shard. In case you set the number of shards to 1, all data will go to a single TaskManager. For most uses cases, e.g. windowed writes, the data is already sharded naturally due to a GroupByKey operation. It

Re: Parallel computation of windows in Flink

2019-06-10 Thread Mike Kaplinskiy
At a high level, the pipeline looks something like this: pipeline .apply("read kafka", KafkaIO.readBytes().updateConsumerProperties({"auto.offset.reset": "earliest"})) .apply("xform", MapElements.via(...)) .apply("window", Window.into(FixedWindows.of(Duration.standardDays(1)))

Re: Parallel computation of windows in Flink

2019-06-10 Thread Ankur Goenka
Hi Mike, This can be because of the partitioning logic of the data. If possible, can you share your pipeline code at a high level. On Mon, Jun 10, 2019 at 12:58 PM Mike Kaplinskiy wrote: > > Ladder . The smart, modern way to insure your life. > > > On Mon, Jun 10, 2019

Re: Parallel computation of windows in Flink

2019-06-10 Thread Maximilian Michels
Hi Mike, If you set the number of shards to 1, you should get one shard per window; unless you have "ignore windows" set to true. > (The way I'm checking this is via the Flink UI) I'm curious, how do you check this via the Flink UI? Cheers, Max On 09.06.19 22:33, Mike Kaplinskiy wrote: > Hi

Parallel computation of windows in Flink

2019-06-09 Thread Mike Kaplinskiy
Hi everyone, I’m using a Kafka source with a lot of watermark skew (i.e. new partitions were added to the topic over time). The sink is a FileIO.Write().withNumShards(1) to get ~ 1 file per day & an early trigger to write at most 40,000 records per file. Unfortunately it looks like there's only 1