Good morning Dan,

Being short of information on how you arranged your job, I can only make 
general comments:

ReinterpretAsKeyedStream only applies to data streams that are in fact 
partitioned by the same key, i.e. your job would look somewhat like this:

DataStreamUtils.reinterpretAsKeyedStream(
Stream
.keyBy(keyExtractor1)
.process(keyedProcessFunction1)//or any of the other keyed operators
,keyExtractor2 …
)
.process(keyedProcessFunction2) //or any of the other keyed operators

keyExtractor1 and keyExtractor2 need to come to the same result for related 
events (input/output of keyedProcessFuntion1 resp.)

I assume your exception happens in keyedProcessFunction2?

reinterpretAsKeyedStream makes sense if you want to chain keyedProcessFunction1 
and keyedProcessFunction2, otherwise keyBy() will do …

I hope these hints help, otherwise feel free to get back to the mailing list 
with a more detailed description of your arrangement 😊

Cheers

Thias





From: Dan Hill <quietgol...@gmail.com>
Sent: Freitag, 8. Oktober 2021 06:49
To: user <user@flink.apache.org>
Subject: Helper methods for catching unexpected key changes?

Hi.  I'm getting the following errors when using reinterpretAsKeyedStream.  I 
don't expect the key to change for rows in reinterpretAsKeyedStream.  Are there 
any utilities that I can use that I can use with reinterpetAsKeyedStream to 
verify that the key doesn't change?  E.g. some wrapper operator?



2021-10-02 16:38:46
java.lang.IllegalArgumentException: key group from 154 to 156 does not contain 
213
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at 
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to