Hi

>From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the
same way Flink’s keyBy would partition the data in a shuffle w.r.t.
key-group assignment.
you should make sure that the key locates in the right key-group, and the
key-group locates in the right parallelism. you can ref
KeyGroupRangeAssignment[2] for more information.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
Best,
Congxian


Robin Cassan <robin.cas...@contentsquare.com> 于2019年11月30日周六 上午12:17写道:

> Hi all!
>
> We are trying to build a Flink job that consumes a Kafka topic, groups
> the incoming events in Session Windows according to a String that can
> be generated by parsing the message (we call it `SessionKey`) and does
> some processing on the windows before sending them to another Kafka
> topic.
> Our first implementation used a `keyBy` operator on the incoming
> messages before creating the window, but we realized that we could
> pre-partition our data by `SessionKey` when we insert it into the input
> Kafka topic with a custom component. This would avoid having to
> shuffle data around in Flink, since, for a given `SessionKey`, we would
> ensure that all messages with this key will end-up in the same Kafka
> partition and thus be read by the same subtask, on a single
> TaskManager. This means that we should be able to create a keyed-stream
> from the incoming data without having to transfer data between
> TaskManagers.
>
> To achieve that, we have used the `reinterpretAsKeyedStream` method
> instead of the previous `keyBy`. This got rid of the shuffling step,
> but we are wondering if this is the right way of using this feature and
> whether Flink can manage to match the distribution of Keys from Kafka
> with the ones assigned to each TaskManager?
> We have observed that, while attempting to trigger a savepoint, we
> would encounter exceptions that seem to indicate that the TaskManagers
> received data whose `SessionKey` didn't match their assigned Keys.
> Here is one of the stacktrace we saw while savepointing:
>
> ```
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
> ```
> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
> Amazon S3.
>
> Is our observation about Flink not being able to match the Kafka
> partitioning with the TaskManager's assigned KeyGroups correct?
> And if so, do you have any pointers on how we could pre-partition our
> data in Kafka so that Flink can avoid shuffling data before creating
> the Session Windows?
>
> Cheers,
>
> Robin
>
> --
>
>
> <https://www.contentsquare.com/>
>
> Robin CASSAN
>
> Data Engineer
> +33 6 50 77 88 36
> 5 boulevard de la Madeleine - 75001 Paris
> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link>
> <https://www.facebook.com/ContentSquareGlobal>
> <https://twitter.com/ContentSquare>
>
>
> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>
>

Reply via email to