Thanks for your answers, we will have a look at adapting the Kafka source
to assign the input partitions depending on the assigned Keygroups. If
anyone has already done such a thing I'd love your advice!

Cheers

Robin

Le lun. 2 déc. 2019 à 08:48, Gyula Fóra <gyula.f...@gmail.com> a écrit :

> Hi!
>
> As far as I know,  even if you prepartition the data exactly the same way
> in kafka using the key groups, you have  no guarantee that the kafka
> consumer source would pick up the right partitions.
>
> Maybe if you have exactly as many kafka partitions as keygroups/max
> parallelism, partitioned correctly , but even then you might have to use a
> custom source to have the correct partition assignment for the sub tasks.
>
> Long story short, I believe the built in Kafka source doesnt support what
> you want. But it should be possible to adapt it to do so.
>
> Cheers
> Gyula
>
> On Mon, Dec 2, 2019, 03:49 Congxian Qiu <qcx978132...@gmail.com> wrote:
>
>> Hi
>>
>> From the doc[1], the DataStream MUST already be pre-partitioned in
>> EXACTLY the same way Flink’s keyBy would partition the data in a shuffle
>> w.r.t. key-group assignment.
>> you should make sure that the key locates in the right key-group, and the
>> key-group locates in the right parallelism. you can ref
>> KeyGroupRangeAssignment[2] for more information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
>> Best,
>> Congxian
>>
>>
>> Robin Cassan <robin.cas...@contentsquare.com> 于2019年11月30日周六 上午12:17写道:
>>
>>> Hi all!
>>>
>>> We are trying to build a Flink job that consumes a Kafka topic, groups
>>> the incoming events in Session Windows according to a String that can
>>> be generated by parsing the message (we call it `SessionKey`) and does
>>> some processing on the windows before sending them to another Kafka
>>> topic.
>>> Our first implementation used a `keyBy` operator on the incoming
>>> messages before creating the window, but we realized that we could
>>> pre-partition our data by `SessionKey` when we insert it into the input
>>> Kafka topic with a custom component. This would avoid having to
>>> shuffle data around in Flink, since, for a given `SessionKey`, we would
>>> ensure that all messages with this key will end-up in the same Kafka
>>> partition and thus be read by the same subtask, on a single
>>> TaskManager. This means that we should be able to create a keyed-stream
>>> from the incoming data without having to transfer data between
>>> TaskManagers.
>>>
>>> To achieve that, we have used the `reinterpretAsKeyedStream` method
>>> instead of the previous `keyBy`. This got rid of the shuffling step,
>>> but we are wondering if this is the right way of using this feature and
>>> whether Flink can manage to match the distribution of Keys from Kafka
>>> with the ones assigned to each TaskManager?
>>> We have observed that, while attempting to trigger a savepoint, we
>>> would encounter exceptions that seem to indicate that the TaskManagers
>>> received data whose `SessionKey` didn't match their assigned Keys.
>>> Here is one of the stacktrace we saw while savepointing:
>>>
>>> ```
>>> java.lang.IllegalArgumentException: Key group 0 is not in
>>> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
>>> ```
>>> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
>>> Amazon S3.
>>>
>>> Is our observation about Flink not being able to match the Kafka
>>> partitioning with the TaskManager's assigned KeyGroups correct?
>>> And if so, do you have any pointers on how we could pre-partition our
>>> data in Kafka so that Flink can avoid shuffling data before creating
>>> the Session Windows?
>>>
>>> Cheers,
>>>
>>> Robin
>>>
>>> --
>>>
>>>
>>> <https://www.contentsquare.com/>
>>>
>>> Robin CASSAN
>>>
>>> Data Engineer
>>> +33 6 50 77 88 36
>>> 5 boulevard de la Madeleine - 75001 Paris
>>>
>>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link>
>>> <https://www.facebook.com/ContentSquareGlobal>
>>> <https://twitter.com/ContentSquare>
>>>
>>>
>>> <https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>
>>>
>>

-- 


<https://www.contentsquare.com/>

Robin CASSAN

Data Engineer
+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris
<https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/link>
<https://www.facebook.com/ContentSquareGlobal>
<https://twitter.com/ContentSquare>

<https://data.sigilium.com/signatures/rc25DdYfh2CrRXUqTrAvskKBbVhn8rc/redirect_link>

Reply via email to