Thanks for your answers, we will have a look at adapting the Kafka source to assign the input partitions depending on the assigned Keygroups. If anyone has already done such a thing I'd love your advice!
Cheers Robin Le lun. 2 déc. 2019 à 08:48, Gyula Fóra <gyula.f...@gmail.com> a écrit : > Hi! > > As far as I know, even if you prepartition the data exactly the same way > in kafka using the key groups, you have no guarantee that the kafka > consumer source would pick up the right partitions. > > Maybe if you have exactly as many kafka partitions as keygroups/max > parallelism, partitioned correctly , but even then you might have to use a > custom source to have the correct partition assignment for the sub tasks. > > Long story short, I believe the built in Kafka source doesnt support what > you want. But it should be possible to adapt it to do so. > > Cheers > Gyula > > On Mon, Dec 2, 2019, 03:49 Congxian Qiu <qcx978132...@gmail.com> wrote: > >> Hi >> >> From the doc[1], the DataStream MUST already be pre-partitioned in >> EXACTLY the same way Flink’s keyBy would partition the data in a shuffle >> w.r.t. key-group assignment. >> you should make sure that the key locates in the right key-group, and the >> key-group locates in the right parallelism. you can ref >> KeyGroupRangeAssignment[2] for more information. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html >> Best, >> Congxian >> >> >> Robin Cassan <robin.cas...@contentsquare.com> 于2019年11月30日周六 上午12:17写道: >> >>> Hi all! >>> >>> We are trying to build a Flink job that consumes a Kafka topic, groups >>> the incoming events in Session Windows according to a String that can >>> be generated by parsing the message (we call it `SessionKey`) and does >>> some processing on the windows before sending them to another Kafka >>> topic. >>> Our first implementation used a `keyBy` operator on the incoming >>> messages before creating the window, but we realized that we could >>> pre-partition our data by `SessionKey` when we insert it into the input >>> Kafka topic with a custom component. This would avoid having to >>> shuffle data around in Flink, since, for a given `SessionKey`, we would >>> ensure that all messages with this key will end-up in the same Kafka >>> partition and thus be read by the same subtask, on a single >>> TaskManager. This means that we should be able to create a keyed-stream >>> from the incoming data without having to transfer data between >>> TaskManagers. >>> >>> To achieve that, we have used the `reinterpretAsKeyedStream` method >>> instead of the previous `keyBy`. This got rid of the shuffling step, >>> but we are wondering if this is the right way of using this feature and >>> whether Flink can manage to match the distribution of Keys from Kafka >>> with the ones assigned to each TaskManager? >>> We have observed that, while attempting to trigger a savepoint, we >>> would encounter exceptions that seem to indicate that the TaskManagers >>> received data whose `SessionKey` didn't match their assigned Keys. >>> Here is one of the stacktrace we saw while savepointing: >>> >>> ``` >>> java.lang.IllegalArgumentException: Key group 0 is not in >>> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. >>> at >>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) >>> at >>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) >>> at >>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316) >>> at >>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) >>> at >>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223) >>> at >>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176) >>> at >>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392) >>> ``` >>> We are currently using Flink 1.8.2 on Kubernetes, savepointing to >>> Amazon S3. >>> >>> Is our observation about Flink not being able to match the Kafka >>> partitioning with the TaskManager's assigned KeyGroups correct? >>> And if so, do you have any pointers on how we could pre-partition our >>> data in Kafka so that Flink can avoid shuffling data before creating >>> the Session Windows? >>> >>> Cheers, >>> >>> Robin >>> >>> -- >>> >>> >>> <https://www.contentsquare.com/> >>> >>> Robin CASSAN >>> >>> Data Engineer >>> +33 6 50 77 88 36 >>> 5 boulevard de la Madeleine - 75001 Paris >>> >>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link> >>> <https://www.facebook.com/ContentSquareGlobal> >>> <https://twitter.com/ContentSquare> >>> >>> >>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link> >>> >> -- <https://www.contentsquare.com/> Robin CASSAN Data Engineer +33 6 50 77 88 36 5 boulevard de la Madeleine - 75001 Paris <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link> <https://www.facebook.com/ContentSquareGlobal> <https://twitter.com/ContentSquare> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>