Hi Mike, It doesn't matter if your pipeline has a GroupByKey, as the file IO will add GroupByKey for the sharding. So the initial distribution does not matter.
> Here's my theory of what's happening right now: Beam translates GBK 1:1 to > Flink's `.keyBy` & Flink's `.keyBy` serializes by all windows by key. GroupByKey is not a 1:1 translation to Flink's `keyBy`. `Keyby` is used to partition the data such that all identical keys are on the same machine. Then we use Beam's ReduceFnRunner to process the windows per key. > it looks like Flink supports a notion of `globalState` - i.e. state not tied > to a window but just the key. The only way to make processing thread-safe in > this world is to serialize all windows of the same key. In Flink and Beam state is always scoped by key _and_ window. > I'm not 100% sure, but Beam's semantics are slightly different - all windows > are considered independent and there isn't a global state. In Beam you can eagerly assign windows. In Flink windows are assigned lazily by a dedicated window operator which also keeps track of state per window and key. However, the same is true for Beam's GroupByKey which keeps track of the state per window and key. In the end, the problem here is the sharding behavior of the file IO. You may want to use a different way of writing out data, or at least add a GroupByKey before applying the File IO if you do not want the windowing to be performed on a single machine, due to the file IO being set to one shard. Thanks, Max On 11.06.19 19:58, Mike Kaplinskiy wrote: > I'm a little confused - I think my pipeline does have a GroupByKey - > specifically FileIO/WriteFiles adds one. Even if I add a Reshuffle.of() > before the FileIO.Write, that GBK with 1 key is still there. Here's what > it looks like from Flink's perspective: > image.png > The screenshot of hot keys from before was from inspecting the second > vertex/stage. > > Here's my theory of what's happening right now: Beam translates GBK 1:1 > to Flink's `.keyBy` & Flink's `.keyBy` serializes by all windows by key. > From > reading > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction > - it looks like Flink supports a notion of `globalState` - i.e. state > not tied to a window but just the key. The only way to make processing > thread-safe in this world is to serialize all windows of the same key. > > I'm not 100% sure, but Beam's semantics are slightly different - all > windows are considered independent and there isn't a global state. I'm > not 100% sure, but it seems like the translation from Beam to Flink > could potentially include the windows in the key of `.keyBy` to allow > parallelizing window computations. Does this sound like a reasonable > feature request? > > Mike > > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life. > > > > On Tue, Jun 11, 2019 at 6:14 AM Maximilian Michels <[email protected] > <mailto:[email protected]>> wrote: > > Thanks for clarifying. The output will be grouped by the shard key > before producing the shard. In case you set the number of shards to > 1, all data will go to a single TaskManager. > > For most uses cases, e.g. windowed writes, the data is already > sharded naturally due to a GroupByKey operation. It looks like you > want to directly write out this data? Unfortunately with the FileIO > logic this is not possible. > > Cheers, > Max > > On 10.06.19 23:30, Mike Kaplinskiy wrote: > > At a high level, the pipeline looks something like this: > > > > pipeline > > .apply("read kafka", > > KafkaIO.readBytes().updateConsumerProperties({"auto.offset.reset": > > "earliest"})) > > .apply("xform", MapElements.via(...)) > > .apply("window", > Window.into(FixedWindows.of(Duration.standardDays(1))) > > > > > > .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000))) > > .discardingFiredPanes() > > .withAllowedLateness(Duration.standardHours(1))) > > .apply("write", > > > > FileIO.write().to("s3://...").withNumShards(1).withCompression(Compression.GZIP)) > > > > Mike. > > > > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure > your life. > > > > > > > > On Mon, Jun 10, 2019 at 1:47 PM Ankur Goenka <[email protected] > <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Hi Mike, This can be because of the partitioning logic of the > data. > > > > If possible, can you share your pipeline code at a high level. > > > > On Mon, Jun 10, 2019 at 12:58 PM Mike Kaplinskiy > > <[email protected] <mailto:[email protected]> > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to > insure > > your life. > > > > > > > > On Mon, Jun 10, 2019 at 6:51 AM Maximilian Michels > > <[email protected] <mailto:[email protected]> > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Hi Mike, > > > > If you set the number of shards to 1, you should get one > > shard per window; unless you have "ignore windows" set > to true. > > > > > > Right, that makes sense, and what I expected. The thing that I > > find a little puzzling is that a single Flink sub-task > receives > > all of the data - I'd expect the actual work to be spread > across > > runners since each window is independent. > > > > > > > > > (The way I'm checking this is via the Flink UI) > > > > I'm curious, how do you check this via the Flink UI? > > > > > > Attached a screenshot > > > > > > > > Cheers, > > Max > > > > On 09.06.19 22:33, Mike Kaplinskiy wrote: > > > Hi everyone, > > > > > > I’m using a Kafka source with a lot of watermark skew > > (i.e. new > > > partitions were added to the topic over time). The > sink is a > > > FileIO.Write().withNumShards(1) to get ~ 1 file per > day & > > an early > > > trigger to write at most 40,000 records per file. > > Unfortunately it looks > > > like there's only 1 shard trying to write files for all > > the various days > > > instead of writing multiple days' files in parallel. > (The > > way I'm > > > checking this is via the Flink UI). Is there anything I > > could do here to > > > parallelize the process? All of this is with the > Flink runner. > > > > > > Mike. > > > > > > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to > > insure your life. > > > > > >
