Hi >From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment. you should make sure that the key locates in the right key-group, and the key-group locates in the right parallelism. you can ref KeyGroupRangeAssignment[2] for more information.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html Best, Congxian Robin Cassan <robin.cas...@contentsquare.com> 于2019年11月30日周六 上午12:17写道: > Hi all! > > We are trying to build a Flink job that consumes a Kafka topic, groups > the incoming events in Session Windows according to a String that can > be generated by parsing the message (we call it `SessionKey`) and does > some processing on the windows before sending them to another Kafka > topic. > Our first implementation used a `keyBy` operator on the incoming > messages before creating the window, but we realized that we could > pre-partition our data by `SessionKey` when we insert it into the input > Kafka topic with a custom component. This would avoid having to > shuffle data around in Flink, since, for a given `SessionKey`, we would > ensure that all messages with this key will end-up in the same Kafka > partition and thus be read by the same subtask, on a single > TaskManager. This means that we should be able to create a keyed-stream > from the incoming data without having to transfer data between > TaskManagers. > > To achieve that, we have used the `reinterpretAsKeyedStream` method > instead of the previous `keyBy`. This got rid of the shuffling step, > but we are wondering if this is the right way of using this feature and > whether Flink can manage to match the distribution of Keys from Kafka > with the ones assigned to each TaskManager? > We have observed that, while attempting to trigger a savepoint, we > would encounter exceptions that seem to indicate that the TaskManagers > received data whose `SessionKey` didn't match their assigned Keys. > Here is one of the stacktrace we saw while savepointing: > > ``` > java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392) > ``` > We are currently using Flink 1.8.2 on Kubernetes, savepointing to > Amazon S3. > > Is our observation about Flink not being able to match the Kafka > partitioning with the TaskManager's assigned KeyGroups correct? > And if so, do you have any pointers on how we could pre-partition our > data in Kafka so that Flink can avoid shuffling data before creating > the Session Windows? > > Cheers, > > Robin > > -- > > > <https://www.contentsquare.com/> > > Robin CASSAN > > Data Engineer > +33 6 50 77 88 36 > 5 boulevard de la Madeleine - 75001 Paris > <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link> > <https://www.facebook.com/ContentSquareGlobal> > <https://twitter.com/ContentSquare> > > > <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link> >